1pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::fmt::{Debug, Display};
30use std::time::Duration;
31
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::CacheInvalidatorRef;
34use common_meta::ddl::RegionFailureDetectorControllerRef;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
37use common_meta::key::table_info::TableInfoValue;
38use common_meta::key::table_route::TableRouteValue;
39use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
40use common_meta::kv_backend::ResettableKvBackendRef;
41use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
42use common_meta::peer::Peer;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_procedure::error::{
45 Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
46};
47use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
48use common_telemetry::{error, info};
49use manager::RegionMigrationProcedureGuard;
50pub use manager::{
51 RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
52};
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::RegionId;
56use tokio::time::Instant;
57
58use self::migration_start::RegionMigrationStart;
59use crate::error::{self, Result};
60use crate::metrics::{
61 METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
62 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
63};
64use crate::service::mailbox::MailboxRef;
65
66pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct PersistentContext {
76 catalog: String,
78 schema: String,
80 from_peer: Peer,
82 to_peer: Peer,
84 region_id: RegionId,
86 #[serde(with = "humantime_serde", default = "default_timeout")]
88 timeout: Duration,
89}
90
91fn default_timeout() -> Duration {
92 Duration::from_secs(10)
93}
94
95impl PersistentContext {
96 pub fn lock_key(&self) -> Vec<StringKey> {
97 let region_id = self.region_id;
98 let lock_key = vec![
99 CatalogLock::Read(&self.catalog).into(),
100 SchemaLock::read(&self.catalog, &self.schema).into(),
101 RegionLock::Write(region_id).into(),
102 ];
103
104 lock_key
105 }
106}
107
108#[derive(Debug, Clone, Default)]
110pub struct Metrics {
111 operations_elapsed: Duration,
113 flush_leader_region_elapsed: Duration,
115 downgrade_leader_region_elapsed: Duration,
117 open_candidate_region_elapsed: Duration,
119 upgrade_candidate_region_elapsed: Duration,
121}
122
123impl Display for Metrics {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 let total = self.flush_leader_region_elapsed
126 + self.downgrade_leader_region_elapsed
127 + self.open_candidate_region_elapsed
128 + self.upgrade_candidate_region_elapsed;
129 write!(
130 f,
131 "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
132 total,
133 self.flush_leader_region_elapsed,
134 self.downgrade_leader_region_elapsed,
135 self.open_candidate_region_elapsed,
136 self.upgrade_candidate_region_elapsed
137 )
138 }
139}
140
141impl Metrics {
142 pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
144 self.operations_elapsed += elapsed;
145 }
146
147 pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
149 self.flush_leader_region_elapsed += elapsed;
150 }
151
152 pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
154 self.downgrade_leader_region_elapsed += elapsed;
155 }
156
157 pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
159 self.open_candidate_region_elapsed += elapsed;
160 }
161
162 pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
164 self.upgrade_candidate_region_elapsed += elapsed;
165 }
166}
167
168impl Drop for Metrics {
169 fn drop(&mut self) {
170 let total = self.flush_leader_region_elapsed
171 + self.downgrade_leader_region_elapsed
172 + self.open_candidate_region_elapsed
173 + self.upgrade_candidate_region_elapsed;
174 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
175 .with_label_values(&["total"])
176 .observe(total.as_secs_f64());
177
178 if !self.flush_leader_region_elapsed.is_zero() {
179 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
180 .with_label_values(&["flush_leader_region"])
181 .observe(self.flush_leader_region_elapsed.as_secs_f64());
182 }
183
184 if !self.downgrade_leader_region_elapsed.is_zero() {
185 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
186 .with_label_values(&["downgrade_leader_region"])
187 .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
188 }
189
190 if !self.open_candidate_region_elapsed.is_zero() {
191 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
192 .with_label_values(&["open_candidate_region"])
193 .observe(self.open_candidate_region_elapsed.as_secs_f64());
194 }
195
196 if !self.upgrade_candidate_region_elapsed.is_zero() {
197 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
198 .with_label_values(&["upgrade_candidate_region"])
199 .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
200 }
201 }
202}
203
204#[derive(Debug, Clone, Default)]
210pub struct VolatileContext {
211 opening_region_guard: Option<OperatingRegionGuard>,
218 table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
220 from_peer_datanode_table: Option<DatanodeTableValue>,
222 table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
227 leader_region_lease_deadline: Option<Instant>,
229 leader_region_last_entry_id: Option<u64>,
231 leader_region_metadata_last_entry_id: Option<u64>,
233 metrics: Metrics,
235}
236
237impl VolatileContext {
238 pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
240 if self.leader_region_lease_deadline.is_none() {
241 self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
242 }
243 }
244
245 pub fn reset_leader_region_lease_deadline(&mut self) {
247 self.leader_region_lease_deadline = None;
248 }
249
250 pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
252 self.leader_region_last_entry_id = Some(last_entry_id)
253 }
254
255 pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
257 self.leader_region_metadata_last_entry_id = Some(last_entry_id);
258 }
259}
260
261pub trait ContextFactory {
263 fn new_context(self, persistent_ctx: PersistentContext) -> Context;
264}
265
266#[derive(Clone)]
268pub struct DefaultContextFactory {
269 volatile_ctx: VolatileContext,
270 in_memory_key: ResettableKvBackendRef,
271 table_metadata_manager: TableMetadataManagerRef,
272 opening_region_keeper: MemoryRegionKeeperRef,
273 region_failure_detector_controller: RegionFailureDetectorControllerRef,
274 mailbox: MailboxRef,
275 server_addr: String,
276 cache_invalidator: CacheInvalidatorRef,
277}
278
279impl DefaultContextFactory {
280 pub fn new(
282 in_memory_key: ResettableKvBackendRef,
283 table_metadata_manager: TableMetadataManagerRef,
284 opening_region_keeper: MemoryRegionKeeperRef,
285 region_failure_detector_controller: RegionFailureDetectorControllerRef,
286 mailbox: MailboxRef,
287 server_addr: String,
288 cache_invalidator: CacheInvalidatorRef,
289 ) -> Self {
290 Self {
291 volatile_ctx: VolatileContext::default(),
292 in_memory_key,
293 table_metadata_manager,
294 opening_region_keeper,
295 region_failure_detector_controller,
296 mailbox,
297 server_addr,
298 cache_invalidator,
299 }
300 }
301}
302
303impl ContextFactory for DefaultContextFactory {
304 fn new_context(self, persistent_ctx: PersistentContext) -> Context {
305 Context {
306 persistent_ctx,
307 volatile_ctx: self.volatile_ctx,
308 in_memory: self.in_memory_key,
309 table_metadata_manager: self.table_metadata_manager,
310 opening_region_keeper: self.opening_region_keeper,
311 region_failure_detector_controller: self.region_failure_detector_controller,
312 mailbox: self.mailbox,
313 server_addr: self.server_addr,
314 cache_invalidator: self.cache_invalidator,
315 }
316 }
317}
318
319pub struct Context {
321 persistent_ctx: PersistentContext,
322 volatile_ctx: VolatileContext,
323 in_memory: ResettableKvBackendRef,
324 table_metadata_manager: TableMetadataManagerRef,
325 opening_region_keeper: MemoryRegionKeeperRef,
326 region_failure_detector_controller: RegionFailureDetectorControllerRef,
327 mailbox: MailboxRef,
328 server_addr: String,
329 cache_invalidator: CacheInvalidatorRef,
330}
331
332impl Context {
333 pub fn next_operation_timeout(&self) -> Option<Duration> {
335 self.persistent_ctx
336 .timeout
337 .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
338 }
339
340 pub fn update_operations_elapsed(&mut self, instant: Instant) {
342 self.volatile_ctx
343 .metrics
344 .update_operations_elapsed(instant.elapsed());
345 }
346
347 pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
349 self.volatile_ctx
350 .metrics
351 .update_flush_leader_region_elapsed(instant.elapsed());
352 }
353
354 pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
356 self.volatile_ctx
357 .metrics
358 .update_downgrade_leader_region_elapsed(instant.elapsed());
359 }
360
361 pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
363 self.volatile_ctx
364 .metrics
365 .update_open_candidate_region_elapsed(instant.elapsed());
366 }
367
368 pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
370 self.volatile_ctx
371 .metrics
372 .update_upgrade_candidate_region_elapsed(instant.elapsed());
373 }
374
375 pub fn server_addr(&self) -> &str {
377 &self.server_addr
378 }
379
380 pub async fn get_table_route_value(
386 &mut self,
387 ) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
388 let table_route_value = &mut self.volatile_ctx.table_route;
389
390 if table_route_value.is_none() {
391 let table_id = self.persistent_ctx.region_id.table_id();
392 let table_route = self
393 .table_metadata_manager
394 .table_route_manager()
395 .table_route_storage()
396 .get_with_raw_bytes(table_id)
397 .await
398 .context(error::TableMetadataManagerSnafu)
399 .map_err(BoxedError::new)
400 .with_context(|_| error::RetryLaterWithSourceSnafu {
401 reason: format!("Failed to get TableRoute: {table_id}"),
402 })?
403 .context(error::TableRouteNotFoundSnafu { table_id })?;
404
405 *table_route_value = Some(table_route);
406 }
407
408 Ok(table_route_value.as_ref().unwrap())
409 }
410
411 pub async fn register_failure_detectors(&self) {
416 let datanode_id = self.persistent_ctx.from_peer.id;
417 let region_id = self.persistent_ctx.region_id;
418
419 self.region_failure_detector_controller
420 .register_failure_detectors(vec![(datanode_id, region_id)])
421 .await;
422 }
423
424 pub async fn deregister_failure_detectors(&self) {
429 let datanode_id = self.persistent_ctx.from_peer.id;
430 let region_id = self.persistent_ctx.region_id;
431
432 self.region_failure_detector_controller
433 .deregister_failure_detectors(vec![(datanode_id, region_id)])
434 .await;
435 }
436
437 pub async fn deregister_failure_detectors_for_candidate_region(&self) {
442 let to_peer_id = self.persistent_ctx.to_peer.id;
443 let region_id = self.persistent_ctx.region_id;
444
445 self.region_failure_detector_controller
446 .deregister_failure_detectors(vec![(to_peer_id, region_id)])
447 .await;
448 }
449
450 pub fn remove_table_route_value(&mut self) -> bool {
452 let value = self.volatile_ctx.table_route.take();
453 value.is_some()
454 }
455
456 pub async fn get_table_info_value(
462 &mut self,
463 ) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
464 let table_info_value = &mut self.volatile_ctx.table_info;
465
466 if table_info_value.is_none() {
467 let table_id = self.persistent_ctx.region_id.table_id();
468 let table_info = self
469 .table_metadata_manager
470 .table_info_manager()
471 .get(table_id)
472 .await
473 .context(error::TableMetadataManagerSnafu)
474 .map_err(BoxedError::new)
475 .with_context(|_| error::RetryLaterWithSourceSnafu {
476 reason: format!("Failed to get TableInfo: {table_id}"),
477 })?
478 .context(error::TableInfoNotFoundSnafu { table_id })?;
479
480 *table_info_value = Some(table_info);
481 }
482
483 Ok(table_info_value.as_ref().unwrap())
484 }
485
486 pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> {
492 let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table;
493
494 if datanode_value.is_none() {
495 let table_id = self.persistent_ctx.region_id.table_id();
496 let datanode_id = self.persistent_ctx.from_peer.id;
497
498 let datanode_table = self
499 .table_metadata_manager
500 .datanode_table_manager()
501 .get(&DatanodeTableKey {
502 datanode_id,
503 table_id,
504 })
505 .await
506 .context(error::TableMetadataManagerSnafu)
507 .map_err(BoxedError::new)
508 .with_context(|_| error::RetryLaterWithSourceSnafu {
509 reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
510 })?
511 .context(error::DatanodeTableNotFoundSnafu {
512 table_id,
513 datanode_id,
514 })?;
515
516 *datanode_value = Some(datanode_table);
517 }
518
519 Ok(datanode_value.as_ref().unwrap())
520 }
521
522 pub fn region_id(&self) -> RegionId {
524 self.persistent_ctx.region_id
525 }
526
527 pub async fn invalidate_table_cache(&self) -> Result<()> {
529 let table_id = self.region_id().table_id();
530 let ctx = common_meta::cache_invalidator::Context::default();
532 let _ = self
533 .cache_invalidator
534 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
535 .await;
536 Ok(())
537 }
538}
539
540#[async_trait::async_trait]
541#[typetag::serde(tag = "region_migration_state")]
542pub(crate) trait State: Sync + Send + Debug {
543 fn name(&self) -> &'static str {
544 let type_name = std::any::type_name::<Self>();
545 type_name.split("::").last().unwrap_or(type_name)
547 }
548
549 async fn next(
551 &mut self,
552 ctx: &mut Context,
553 procedure_ctx: &ProcedureContext,
554 ) -> Result<(Box<dyn State>, Status)>;
555
556 fn as_any(&self) -> &dyn Any;
558}
559
560#[derive(Debug, Serialize, Deserialize)]
562pub struct RegionMigrationDataOwned {
563 persistent_ctx: PersistentContext,
564 state: Box<dyn State>,
565}
566
567#[derive(Debug, Serialize)]
569pub struct RegionMigrationData<'a> {
570 persistent_ctx: &'a PersistentContext,
571 state: &'a dyn State,
572}
573
574pub(crate) struct RegionMigrationProcedure {
575 state: Box<dyn State>,
576 context: Context,
577 _guard: Option<RegionMigrationProcedureGuard>,
578}
579
580impl RegionMigrationProcedure {
581 const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
582
583 pub fn new(
584 persistent_context: PersistentContext,
585 context_factory: impl ContextFactory,
586 guard: Option<RegionMigrationProcedureGuard>,
587 ) -> Self {
588 let state = Box::new(RegionMigrationStart {});
589 Self::new_inner(state, persistent_context, context_factory, guard)
590 }
591
592 fn new_inner(
593 state: Box<dyn State>,
594 persistent_context: PersistentContext,
595 context_factory: impl ContextFactory,
596 guard: Option<RegionMigrationProcedureGuard>,
597 ) -> Self {
598 Self {
599 state,
600 context: context_factory.new_context(persistent_context),
601 _guard: guard,
602 }
603 }
604
605 fn from_json(
606 json: &str,
607 context_factory: impl ContextFactory,
608 tracker: RegionMigrationProcedureTracker,
609 ) -> ProcedureResult<Self> {
610 let RegionMigrationDataOwned {
611 persistent_ctx,
612 state,
613 } = serde_json::from_str(json).context(FromJsonSnafu)?;
614
615 let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
616 region_id: persistent_ctx.region_id,
617 from_peer: persistent_ctx.from_peer.clone(),
618 to_peer: persistent_ctx.to_peer.clone(),
619 timeout: persistent_ctx.timeout,
620 });
621 let context = context_factory.new_context(persistent_ctx);
622
623 Ok(Self {
624 state,
625 context,
626 _guard: guard,
627 })
628 }
629
630 async fn rollback_inner(&mut self) -> Result<()> {
631 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
632 .with_label_values(&["rollback"])
633 .start_timer();
634
635 let table_id = self.context.region_id().table_id();
636 let region_id = self.context.region_id();
637 self.context.remove_table_route_value();
638 let table_metadata_manager = self.context.table_metadata_manager.clone();
639 let table_route = self.context.get_table_route_value().await?;
640
641 let downgraded = table_route
643 .region_routes()
644 .unwrap()
645 .iter()
646 .filter(|route| route.region.id == region_id)
647 .any(|route| route.is_leader_downgrading());
648
649 if downgraded {
650 info!("Rollbacking downgraded region leader table route, region: {region_id}");
651 table_metadata_manager
652 .update_leader_region_status(table_id, table_route, |route| {
653 if route.region.id == region_id {
654 Some(None)
655 } else {
656 None
657 }
658 })
659 .await
660 .context(error::TableMetadataManagerSnafu)
661 .map_err(BoxedError::new)
662 .with_context(|_| error::RetryLaterWithSourceSnafu {
663 reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
664 })?;
665 }
666
667 self.context.register_failure_detectors().await;
668
669 Ok(())
670 }
671}
672
673#[async_trait::async_trait]
674impl Procedure for RegionMigrationProcedure {
675 fn type_name(&self) -> &str {
676 Self::TYPE_NAME
677 }
678
679 async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
680 self.rollback_inner()
681 .await
682 .map_err(ProcedureError::external)
683 }
684
685 fn rollback_supported(&self) -> bool {
686 true
687 }
688
689 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
690 let state = &mut self.state;
691
692 let name = state.name();
693 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
694 .with_label_values(&[name])
695 .start_timer();
696 match state.next(&mut self.context, ctx).await {
697 Ok((next, status)) => {
698 *state = next;
699 Ok(status)
700 }
701 Err(e) => {
702 if e.is_retryable() {
703 METRIC_META_REGION_MIGRATION_ERROR
704 .with_label_values(&[name, "retryable"])
705 .inc();
706 Err(ProcedureError::retry_later(e))
707 } else {
708 self.context.volatile_ctx.opening_region_guard.take();
710 self.context
711 .deregister_failure_detectors_for_candidate_region()
712 .await;
713 error!(
714 e;
715 "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
716 self.context.region_id(),
717 self.context.persistent_ctx.from_peer,
718 self.context.persistent_ctx.to_peer,
719 self.context.volatile_ctx.metrics,
720 );
721 METRIC_META_REGION_MIGRATION_ERROR
722 .with_label_values(&[name, "external"])
723 .inc();
724 Err(ProcedureError::external(e))
725 }
726 }
727 }
728 }
729
730 fn dump(&self) -> ProcedureResult<String> {
731 let data = RegionMigrationData {
732 state: self.state.as_ref(),
733 persistent_ctx: &self.context.persistent_ctx,
734 };
735 serde_json::to_string(&data).context(ToJsonSnafu)
736 }
737
738 fn lock_key(&self) -> LockKey {
739 LockKey::new(self.context.persistent_ctx.lock_key())
740 }
741}
742
743#[cfg(test)]
744mod tests {
745 use std::assert_matches::assert_matches;
746 use std::sync::Arc;
747
748 use common_meta::distributed_time_constants::REGION_LEASE_SECS;
749 use common_meta::instruction::Instruction;
750 use common_meta::key::test_utils::new_test_table_info;
751 use common_meta::rpc::router::{Region, RegionRoute};
752
753 use super::update_metadata::UpdateMetadata;
754 use super::*;
755 use crate::handler::HeartbeatMailbox;
756 use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
757 use crate::procedure::region_migration::test_util::*;
758 use crate::procedure::test_util::{
759 new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
760 new_upgrade_region_reply,
761 };
762 use crate::service::mailbox::Channel;
763
764 fn new_persistent_context() -> PersistentContext {
765 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
766 }
767
768 #[test]
769 fn test_lock_key() {
770 let persistent_context = new_persistent_context();
771 let expected_keys = persistent_context.lock_key();
772
773 let env = TestingEnv::new();
774 let context = env.context_factory();
775
776 let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
777
778 let key = procedure.lock_key();
779 let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
780
781 for key in expected_keys {
782 assert!(keys.contains(&key));
783 }
784 }
785
786 #[test]
787 fn test_data_serialization() {
788 let persistent_context = new_persistent_context();
789
790 let env = TestingEnv::new();
791 let context = env.context_factory();
792
793 let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
794
795 let serialized = procedure.dump().unwrap();
796 let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
797 assert_eq!(expected, serialized);
798 }
799
800 #[test]
801 fn test_backward_compatibility() {
802 let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
803 let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
805 let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
806
807 assert_eq!(persistent_ctx, deserialized);
808 }
809
810 #[derive(Debug, Serialize, Deserialize, Default)]
811 pub struct MockState;
812
813 #[async_trait::async_trait]
814 #[typetag::serde]
815 impl State for MockState {
816 async fn next(
817 &mut self,
818 _ctx: &mut Context,
819 _procedure_ctx: &ProcedureContext,
820 ) -> Result<(Box<dyn State>, Status)> {
821 Ok((Box::new(MockState), Status::done()))
822 }
823
824 fn as_any(&self) -> &dyn Any {
825 self
826 }
827 }
828
829 #[tokio::test]
830 async fn test_execution_after_deserialized() {
831 let env = TestingEnv::new();
832
833 fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
834 let persistent_context = new_persistent_context();
835 let context_factory = env.context_factory();
836 let state = Box::<MockState>::default();
837 RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
838 }
839
840 let ctx = TestingEnv::procedure_context();
841 let mut procedure = new_mock_procedure(&env);
842 let mut status = None;
843 for _ in 0..3 {
844 status = Some(procedure.execute(&ctx).await.unwrap());
845 }
846 assert!(status.unwrap().is_done());
847
848 let ctx = TestingEnv::procedure_context();
849 let mut procedure = new_mock_procedure(&env);
850
851 status = Some(procedure.execute(&ctx).await.unwrap());
852
853 let serialized = procedure.dump().unwrap();
854
855 let context_factory = env.context_factory();
856 let tracker = env.tracker();
857 let mut procedure =
858 RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
859 .unwrap();
860 assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
861
862 for _ in 1..3 {
863 status = Some(procedure.execute(&ctx).await.unwrap());
864 }
865 assert!(status.unwrap().is_done());
866 }
867
868 #[tokio::test]
869 async fn test_broadcast_invalidate_table_cache() {
870 let mut env = TestingEnv::new();
871 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
872 let ctx = env.context_factory().new_context(persistent_context);
873 let mailbox_ctx = env.mailbox_context();
874
875 ctx.invalidate_table_cache().await.unwrap();
877
878 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
879
880 mailbox_ctx
881 .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
882 .await;
883
884 ctx.invalidate_table_cache().await.unwrap();
885
886 let resp = rx.recv().await.unwrap().unwrap();
887 let msg = resp.mailbox_message.unwrap();
888
889 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
890 assert_eq!(
891 instruction,
892 Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
893 );
894 }
895
896 fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
897 vec![
898 Step::next(
900 "Should be the update metadata for downgrading",
901 None,
902 Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
903 ),
904 Step::next(
906 "Should be the downgrade leader region",
907 None,
908 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
909 ),
910 Step::next(
912 "Should be the upgrade candidate region",
913 Some(mock_datanode_reply(
914 from_peer_id,
915 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
916 )),
917 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
918 ),
919 Step::next(
921 "Should be the update metadata for upgrading",
922 Some(mock_datanode_reply(
923 to_peer_id,
924 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
925 )),
926 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
927 ),
928 Step::next(
930 "Should be the close downgraded region",
931 None,
932 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
933 ),
934 Step::next(
936 "Should be the region migration end",
937 None,
938 Assertion::simple(assert_region_migration_end, assert_done),
939 ),
940 Step::next(
942 "Should be the region migration end again",
943 None,
944 Assertion::simple(assert_region_migration_end, assert_done),
945 ),
946 ]
947 }
948
949 #[tokio::test]
950 async fn test_procedure_flow() {
951 common_telemetry::init_default_ut_logging();
952
953 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
954 let state = Box::new(RegionMigrationStart);
955
956 let from_peer_id = persistent_context.from_peer.id;
958 let to_peer_id = persistent_context.to_peer.id;
959 let from_peer = persistent_context.from_peer.clone();
960 let to_peer = persistent_context.to_peer.clone();
961 let region_id = persistent_context.region_id;
962 let table_info = new_test_table_info(1024, vec![1]).into();
963 let region_routes = vec![RegionRoute {
964 region: Region::new_test(region_id),
965 leader_peer: Some(from_peer),
966 follower_peers: vec![to_peer],
967 ..Default::default()
968 }];
969
970 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
971 suite.init_table_metadata(table_info, region_routes).await;
972
973 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
974 let timer = Instant::now();
975
976 let runner = ProcedureMigrationSuiteRunner::new(suite)
978 .steps(steps)
979 .run_once()
980 .await;
981
982 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
984
985 runner.suite.verify_table_metadata().await;
986 }
987
988 #[tokio::test]
989 async fn test_procedure_flow_idempotent() {
990 common_telemetry::init_default_ut_logging();
991
992 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
993 let state = Box::new(RegionMigrationStart);
994
995 let from_peer_id = persistent_context.from_peer.id;
997 let to_peer_id = persistent_context.to_peer.id;
998 let from_peer = persistent_context.from_peer.clone();
999 let to_peer = persistent_context.to_peer.clone();
1000 let region_id = persistent_context.region_id;
1001 let table_info = new_test_table_info(1024, vec![1]).into();
1002 let region_routes = vec![RegionRoute {
1003 region: Region::new_test(region_id),
1004 leader_peer: Some(from_peer),
1005 follower_peers: vec![to_peer],
1006 ..Default::default()
1007 }];
1008
1009 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1010 suite.init_table_metadata(table_info, region_routes).await;
1011
1012 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1013 let setup_to_latest_persisted_state = Step::setup(
1014 "Sets state to UpdateMetadata::Downgrade",
1015 merge_before_test_fn(vec![
1016 setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1017 Arc::new(reset_volatile_ctx),
1018 ]),
1019 );
1020
1021 let steps = [
1022 steps.clone(),
1023 vec![setup_to_latest_persisted_state.clone()],
1024 steps.clone()[1..].to_vec(),
1025 vec![setup_to_latest_persisted_state],
1026 steps.clone()[1..].to_vec(),
1027 ]
1028 .concat();
1029 let timer = Instant::now();
1030
1031 let runner = ProcedureMigrationSuiteRunner::new(suite)
1033 .steps(steps.clone())
1034 .run_once()
1035 .await;
1036
1037 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1039
1040 runner.suite.verify_table_metadata().await;
1041 }
1042
1043 #[tokio::test]
1044 async fn test_procedure_flow_open_candidate_region_retryable_error() {
1045 common_telemetry::init_default_ut_logging();
1046
1047 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1048 let state = Box::new(RegionMigrationStart);
1049
1050 let to_peer_id = persistent_context.to_peer.id;
1052 let from_peer = persistent_context.from_peer.clone();
1053 let region_id = persistent_context.region_id;
1054 let table_info = new_test_table_info(1024, vec![1]).into();
1055 let region_routes = vec![RegionRoute {
1056 region: Region::new_test(region_id),
1057 leader_peer: Some(from_peer),
1058 follower_peers: vec![],
1059 ..Default::default()
1060 }];
1061
1062 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1063 suite.init_table_metadata(table_info, region_routes).await;
1064
1065 let steps = vec![
1066 Step::next(
1068 "Should be the open candidate region",
1069 None,
1070 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1071 ),
1072 Step::next(
1074 "Should be throwing a non-retry error",
1075 Some(mock_datanode_reply(
1076 to_peer_id,
1077 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1078 )),
1079 Assertion::error(|error| assert!(error.is_retryable())),
1080 ),
1081 Step::next(
1083 "Should be throwing a non-retry error again",
1084 Some(mock_datanode_reply(
1085 to_peer_id,
1086 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1087 )),
1088 Assertion::error(|error| assert!(error.is_retryable())),
1089 ),
1090 ];
1091
1092 let setup_to_latest_persisted_state = Step::setup(
1093 "Sets state to UpdateMetadata::Downgrade",
1094 merge_before_test_fn(vec![
1095 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1096 Arc::new(reset_volatile_ctx),
1097 ]),
1098 );
1099
1100 let steps = [
1101 steps.clone(),
1102 vec![setup_to_latest_persisted_state.clone()],
1104 steps.clone()[1..].to_vec(),
1105 vec![setup_to_latest_persisted_state],
1106 steps.clone()[1..].to_vec(),
1107 ]
1108 .concat();
1109
1110 let runner = ProcedureMigrationSuiteRunner::new(suite)
1112 .steps(steps.clone())
1113 .run_once()
1114 .await;
1115
1116 let table_routes_version = runner
1117 .env()
1118 .table_metadata_manager()
1119 .table_route_manager()
1120 .table_route_storage()
1121 .get(region_id.table_id())
1122 .await
1123 .unwrap()
1124 .unwrap()
1125 .version();
1126 assert_eq!(table_routes_version.unwrap(), 0);
1128 }
1129
1130 #[tokio::test]
1131 async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1132 common_telemetry::init_default_ut_logging();
1133
1134 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1135 let state = Box::new(RegionMigrationStart);
1136
1137 let from_peer_id = persistent_context.from_peer.id;
1139 let to_peer_id = persistent_context.to_peer.id;
1140 let from_peer = persistent_context.from_peer.clone();
1141 let to_peer = persistent_context.to_peer.clone();
1142 let region_id = persistent_context.region_id;
1143 let table_info = new_test_table_info(1024, vec![1]).into();
1144 let region_routes = vec![RegionRoute {
1145 region: Region::new_test(region_id),
1146 leader_peer: Some(from_peer),
1147 follower_peers: vec![to_peer],
1148 ..Default::default()
1149 }];
1150
1151 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1152 suite.init_table_metadata(table_info, region_routes).await;
1153
1154 let steps = vec![
1155 Step::next(
1157 "Should be the update metadata for downgrading",
1158 None,
1159 Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
1160 ),
1161 Step::next(
1163 "Should be the downgrade leader region",
1164 None,
1165 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1166 ),
1167 Step::next(
1169 "Should be the upgrade candidate region",
1170 Some(mock_datanode_reply(
1171 from_peer_id,
1172 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1173 )),
1174 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1175 ),
1176 Step::next(
1178 "Should be the rollback metadata",
1179 Some(mock_datanode_reply(
1180 to_peer_id,
1181 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1182 )),
1183 Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1184 ),
1185 Step::next(
1187 "Should be the region migration abort",
1188 None,
1189 Assertion::simple(assert_region_migration_abort, assert_no_persist),
1190 ),
1191 Step::next(
1193 "Should throw an error",
1194 None,
1195 Assertion::error(|error| {
1196 assert!(!error.is_retryable());
1197 assert_matches!(error, error::Error::MigrationAbort { .. });
1198 }),
1199 ),
1200 ];
1201
1202 let setup_to_latest_persisted_state = Step::setup(
1203 "Sets state to UpdateMetadata::Downgrade",
1204 merge_before_test_fn(vec![
1205 setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1206 Arc::new(reset_volatile_ctx),
1207 ]),
1208 );
1209
1210 let steps = [
1211 steps.clone(),
1212 vec![setup_to_latest_persisted_state.clone()],
1213 steps.clone()[1..].to_vec(),
1214 vec![setup_to_latest_persisted_state],
1215 steps.clone()[1..].to_vec(),
1216 ]
1217 .concat();
1218
1219 ProcedureMigrationSuiteRunner::new(suite)
1221 .steps(steps.clone())
1222 .run_once()
1223 .await;
1224 }
1225
1226 #[tokio::test]
1227 async fn test_procedure_flow_upgrade_candidate_with_retry() {
1228 common_telemetry::init_default_ut_logging();
1229
1230 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1231 let state = Box::new(RegionMigrationStart);
1232
1233 let to_peer_id = persistent_context.to_peer.id;
1235 let from_peer_id = persistent_context.from_peer.id;
1236 let from_peer = persistent_context.from_peer.clone();
1237 let region_id = persistent_context.region_id;
1238 let table_info = new_test_table_info(1024, vec![1]).into();
1239 let region_routes = vec![RegionRoute {
1240 region: Region::new_test(region_id),
1241 leader_peer: Some(from_peer),
1242 follower_peers: vec![],
1243 ..Default::default()
1244 }];
1245
1246 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1247 suite.init_table_metadata(table_info, region_routes).await;
1248
1249 let steps = vec![
1250 Step::next(
1252 "Should be the open candidate region",
1253 None,
1254 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1255 ),
1256 Step::next(
1258 "Should be throwing a retryable error",
1259 Some(mock_datanode_reply(
1260 to_peer_id,
1261 Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1262 )),
1263 Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1264 ),
1265 Step::next(
1267 "Should be the update metadata for downgrading",
1268 Some(mock_datanode_reply(
1269 to_peer_id,
1270 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1271 )),
1272 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1273 ),
1274 Step::next(
1276 "Should be the flush leader region",
1277 Some(mock_datanode_reply(
1278 from_peer_id,
1279 Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
1280 )),
1281 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1282 ),
1283 Step::next(
1285 "Should be the downgrade leader region",
1286 None,
1287 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1288 ),
1289 Step::next(
1291 "Should be the upgrade candidate region",
1292 Some(mock_datanode_reply(
1293 from_peer_id,
1294 merge_mailbox_messages(vec![
1295 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1296 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1297 ]),
1298 )),
1299 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1300 ),
1301 Step::next(
1303 "Should be the update metadata for upgrading",
1304 Some(mock_datanode_reply(
1305 to_peer_id,
1306 merge_mailbox_messages(vec![
1307 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1308 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1309 ]),
1310 )),
1311 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1312 ),
1313 Step::next(
1315 "Should be the close downgraded region",
1316 None,
1317 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1318 ),
1319 Step::next(
1321 "Should be the region migration end",
1322 None,
1323 Assertion::simple(assert_region_migration_end, assert_done),
1324 ),
1325 Step::next(
1327 "Should be the region migration end again",
1328 None,
1329 Assertion::simple(assert_region_migration_end, assert_done),
1330 ),
1331 Step::setup(
1333 "Sets state to RegionMigrationStart",
1334 merge_before_test_fn(vec![
1335 setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1336 Arc::new(reset_volatile_ctx),
1337 ]),
1338 ),
1339 Step::next(
1343 "Should be the region migration end(has been migrated)",
1344 None,
1345 Assertion::simple(assert_region_migration_end, assert_done),
1346 ),
1347 ];
1348
1349 let steps = [steps.clone()].concat();
1350 let timer = Instant::now();
1351
1352 let runner = ProcedureMigrationSuiteRunner::new(suite)
1354 .steps(steps.clone())
1355 .run_once()
1356 .await;
1357
1358 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1360 runner.suite.verify_table_metadata().await;
1361 }
1362}