1pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27pub(crate) mod utils;
28
29use std::any::Any;
30use std::collections::{HashMap, HashSet};
31use std::fmt::{Debug, Display};
32use std::sync::Arc;
33use std::time::Duration;
34
35use common_error::ext::BoxedError;
36use common_event_recorder::{Event, Eventable};
37use common_meta::cache_invalidator::CacheInvalidatorRef;
38use common_meta::ddl::RegionFailureDetectorControllerRef;
39use common_meta::instruction::CacheIdent;
40use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
41use common_meta::key::table_route::TableRouteValue;
42use common_meta::key::topic_name::TopicNameKey;
43use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
44use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
45use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
46use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
47use common_meta::peer::Peer;
48use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
49use common_procedure::error::{
50 Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
51};
52use common_procedure::{
53 Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
54};
55use common_telemetry::{debug, error, info};
56use manager::RegionMigrationProcedureGuard;
57pub use manager::{
58 RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
59 RegionMigrationTriggerReason,
60};
61use serde::{Deserialize, Deserializer, Serialize};
62use snafu::{OptionExt, ResultExt};
63use store_api::storage::{RegionId, TableId};
64use tokio::time::Instant;
65
66use self::migration_start::RegionMigrationStart;
67use crate::error::{self, Result};
68use crate::events::region_migration_event::RegionMigrationEvent;
69use crate::metrics::{
70 METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
71 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
72};
73use crate::service::mailbox::MailboxRef;
74
75pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
77
78#[derive(Debug, Deserialize)]
79#[serde(untagged)]
80enum SingleOrMultiple<T> {
81 Single(T),
82 Multiple(Vec<T>),
83}
84
85fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
86where
87 D: Deserializer<'de>,
88 T: Deserialize<'de>,
89{
90 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
91 Ok(match helper {
92 SingleOrMultiple::Single(x) => vec![x],
93 SingleOrMultiple::Multiple(xs) => xs,
94 })
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub struct PersistentContext {
104 #[deprecated(note = "use `catalog_and_schema` instead")]
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub(crate) catalog: Option<String>,
108 #[deprecated(note = "use `catalog_and_schema` instead")]
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub(crate) schema: Option<String>,
112 #[serde(default, skip_serializing_if = "Vec::is_empty")]
114 pub(crate) catalog_and_schema: Vec<(String, String)>,
115 pub(crate) from_peer: Peer,
117 pub(crate) to_peer: Peer,
119 #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
121 pub(crate) region_ids: Vec<RegionId>,
122 #[serde(with = "humantime_serde", default = "default_timeout")]
124 pub(crate) timeout: Duration,
125 #[serde(default)]
127 pub(crate) trigger_reason: RegionMigrationTriggerReason,
128}
129
130impl PersistentContext {
131 pub fn new(
132 catalog_and_schema: Vec<(String, String)>,
133 from_peer: Peer,
134 to_peer: Peer,
135 region_ids: Vec<RegionId>,
136 timeout: Duration,
137 trigger_reason: RegionMigrationTriggerReason,
138 ) -> Self {
139 #[allow(deprecated)]
140 Self {
141 catalog: None,
142 schema: None,
143 catalog_and_schema,
144 from_peer,
145 to_peer,
146 region_ids,
147 timeout,
148 trigger_reason,
149 }
150 }
151}
152
153fn default_timeout() -> Duration {
154 Duration::from_secs(10)
155}
156
157impl PersistentContext {
158 pub fn lock_key(&self) -> Vec<StringKey> {
159 let mut lock_keys =
160 Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
161 #[allow(deprecated)]
162 if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
163 lock_keys.push(CatalogLock::Read(catalog).into());
164 lock_keys.push(SchemaLock::read(catalog, schema).into());
165 }
166 for (catalog, schema) in self.catalog_and_schema.iter() {
167 lock_keys.push(CatalogLock::Read(catalog).into());
168 lock_keys.push(SchemaLock::read(catalog, schema).into());
169 }
170
171 let mut region_ids = self.region_ids.clone();
173 region_ids.sort_unstable();
174 for region_id in region_ids {
175 lock_keys.push(RegionLock::Write(region_id).into());
176 }
177 lock_keys
178 }
179
180 pub fn region_table_ids(&self) -> Vec<TableId> {
184 self.region_ids
185 .iter()
186 .map(|region_id| region_id.table_id())
187 .collect::<HashSet<_>>()
188 .into_iter()
189 .collect()
190 }
191
192 pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
196 let mut table_regions = HashMap::new();
197 for region_id in &self.region_ids {
198 table_regions
199 .entry(region_id.table_id())
200 .or_insert_with(Vec::new)
201 .push(*region_id);
202 }
203 table_regions
204 }
205}
206
207impl Eventable for PersistentContext {
208 fn to_event(&self) -> Option<Box<dyn Event>> {
209 Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
210 }
211}
212
213#[derive(Debug, Clone, Default)]
215pub struct Metrics {
216 operations_elapsed: Duration,
218 flush_leader_region_elapsed: Duration,
220 downgrade_leader_region_elapsed: Duration,
222 open_candidate_region_elapsed: Duration,
224 upgrade_candidate_region_elapsed: Duration,
226}
227
228impl Display for Metrics {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 let total = self.flush_leader_region_elapsed
231 + self.downgrade_leader_region_elapsed
232 + self.open_candidate_region_elapsed
233 + self.upgrade_candidate_region_elapsed;
234 write!(
235 f,
236 "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
237 total,
238 self.flush_leader_region_elapsed,
239 self.downgrade_leader_region_elapsed,
240 self.open_candidate_region_elapsed,
241 self.upgrade_candidate_region_elapsed
242 )
243 }
244}
245
246impl Metrics {
247 pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
249 self.operations_elapsed += elapsed;
250 }
251
252 pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
254 self.flush_leader_region_elapsed += elapsed;
255 }
256
257 pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
259 self.downgrade_leader_region_elapsed += elapsed;
260 }
261
262 pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
264 self.open_candidate_region_elapsed += elapsed;
265 }
266
267 pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
269 self.upgrade_candidate_region_elapsed += elapsed;
270 }
271}
272
273impl Drop for Metrics {
274 fn drop(&mut self) {
275 let total = self.flush_leader_region_elapsed
276 + self.downgrade_leader_region_elapsed
277 + self.open_candidate_region_elapsed
278 + self.upgrade_candidate_region_elapsed;
279 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
280 .with_label_values(&["total"])
281 .observe(total.as_secs_f64());
282
283 if !self.flush_leader_region_elapsed.is_zero() {
284 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
285 .with_label_values(&["flush_leader_region"])
286 .observe(self.flush_leader_region_elapsed.as_secs_f64());
287 }
288
289 if !self.downgrade_leader_region_elapsed.is_zero() {
290 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
291 .with_label_values(&["downgrade_leader_region"])
292 .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
293 }
294
295 if !self.open_candidate_region_elapsed.is_zero() {
296 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
297 .with_label_values(&["open_candidate_region"])
298 .observe(self.open_candidate_region_elapsed.as_secs_f64());
299 }
300
301 if !self.upgrade_candidate_region_elapsed.is_zero() {
302 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
303 .with_label_values(&["upgrade_candidate_region"])
304 .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
305 }
306 }
307}
308
309#[derive(Debug, Clone, Default)]
315pub struct VolatileContext {
316 opening_region_guards: Vec<OperatingRegionGuard>,
323 leader_region_lease_deadline: Option<Instant>,
325 from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
327 leader_region_last_entry_ids: HashMap<RegionId, u64>,
329 leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
331 metrics: Metrics,
333}
334
335impl VolatileContext {
336 pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
338 if self.leader_region_lease_deadline.is_none() {
339 self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
340 }
341 }
342
343 pub fn reset_leader_region_lease_deadline(&mut self) {
345 self.leader_region_lease_deadline = None;
346 }
347
348 pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
350 self.leader_region_last_entry_ids
351 .insert(region_id, last_entry_id);
352 }
353
354 pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
356 self.leader_region_metadata_last_entry_ids
357 .insert(region_id, last_entry_id);
358 }
359}
360
361pub trait ContextFactory {
363 fn new_context(self, persistent_ctx: PersistentContext) -> Context;
364}
365
366#[derive(Clone)]
368pub struct DefaultContextFactory {
369 volatile_ctx: VolatileContext,
370 in_memory_key: ResettableKvBackendRef,
371 table_metadata_manager: TableMetadataManagerRef,
372 opening_region_keeper: MemoryRegionKeeperRef,
373 region_failure_detector_controller: RegionFailureDetectorControllerRef,
374 mailbox: MailboxRef,
375 server_addr: String,
376 cache_invalidator: CacheInvalidatorRef,
377}
378
379impl DefaultContextFactory {
380 pub fn new(
382 in_memory_key: ResettableKvBackendRef,
383 table_metadata_manager: TableMetadataManagerRef,
384 opening_region_keeper: MemoryRegionKeeperRef,
385 region_failure_detector_controller: RegionFailureDetectorControllerRef,
386 mailbox: MailboxRef,
387 server_addr: String,
388 cache_invalidator: CacheInvalidatorRef,
389 ) -> Self {
390 Self {
391 volatile_ctx: VolatileContext::default(),
392 in_memory_key,
393 table_metadata_manager,
394 opening_region_keeper,
395 region_failure_detector_controller,
396 mailbox,
397 server_addr,
398 cache_invalidator,
399 }
400 }
401}
402
403impl ContextFactory for DefaultContextFactory {
404 fn new_context(self, persistent_ctx: PersistentContext) -> Context {
405 Context {
406 persistent_ctx,
407 volatile_ctx: self.volatile_ctx,
408 in_memory: self.in_memory_key,
409 table_metadata_manager: self.table_metadata_manager,
410 opening_region_keeper: self.opening_region_keeper,
411 region_failure_detector_controller: self.region_failure_detector_controller,
412 mailbox: self.mailbox,
413 server_addr: self.server_addr,
414 cache_invalidator: self.cache_invalidator,
415 }
416 }
417}
418
419pub struct Context {
421 persistent_ctx: PersistentContext,
422 volatile_ctx: VolatileContext,
423 in_memory: KvBackendRef,
424 table_metadata_manager: TableMetadataManagerRef,
425 opening_region_keeper: MemoryRegionKeeperRef,
426 region_failure_detector_controller: RegionFailureDetectorControllerRef,
427 mailbox: MailboxRef,
428 server_addr: String,
429 cache_invalidator: CacheInvalidatorRef,
430}
431
432impl Context {
433 pub fn next_operation_timeout(&self) -> Option<Duration> {
435 self.persistent_ctx
436 .timeout
437 .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
438 }
439
440 pub fn update_operations_elapsed(&mut self, instant: Instant) {
442 self.volatile_ctx
443 .metrics
444 .update_operations_elapsed(instant.elapsed());
445 }
446
447 pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
449 self.volatile_ctx
450 .metrics
451 .update_flush_leader_region_elapsed(instant.elapsed());
452 }
453
454 pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
456 self.volatile_ctx
457 .metrics
458 .update_downgrade_leader_region_elapsed(instant.elapsed());
459 }
460
461 pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
463 self.volatile_ctx
464 .metrics
465 .update_open_candidate_region_elapsed(instant.elapsed());
466 }
467
468 pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
470 self.volatile_ctx
471 .metrics
472 .update_upgrade_candidate_region_elapsed(instant.elapsed());
473 }
474
475 pub fn server_addr(&self) -> &str {
477 &self.server_addr
478 }
479
480 pub fn region_table_ids(&self) -> Vec<TableId> {
482 self.persistent_ctx
483 .region_ids
484 .iter()
485 .map(|region_id| region_id.table_id())
486 .collect::<HashSet<_>>()
487 .into_iter()
488 .collect()
489 }
490
491 pub async fn get_table_route_values(
497 &self,
498 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
499 let table_ids = self.persistent_ctx.region_table_ids();
500 let table_routes = self
501 .table_metadata_manager
502 .table_route_manager()
503 .table_route_storage()
504 .batch_get_with_raw_bytes(&table_ids)
505 .await
506 .context(error::TableMetadataManagerSnafu)
507 .map_err(BoxedError::new)
508 .with_context(|_| error::RetryLaterWithSourceSnafu {
509 reason: format!("Failed to get table routes: {table_ids:?}"),
510 })?;
511 let table_routes = table_ids
512 .into_iter()
513 .zip(table_routes)
514 .filter_map(|(table_id, table_route)| {
515 table_route.map(|table_route| (table_id, table_route))
516 })
517 .collect::<HashMap<_, _>>();
518 Ok(table_routes)
519 }
520
521 pub async fn get_table_route_value(
527 &self,
528 table_id: TableId,
529 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
530 let table_route_value = self
531 .table_metadata_manager
532 .table_route_manager()
533 .table_route_storage()
534 .get_with_raw_bytes(table_id)
535 .await
536 .context(error::TableMetadataManagerSnafu)
537 .map_err(BoxedError::new)
538 .with_context(|_| error::RetryLaterWithSourceSnafu {
539 reason: format!("Failed to get table routes: {table_id:}"),
540 })?
541 .context(error::TableRouteNotFoundSnafu { table_id })?;
542 Ok(table_route_value)
543 }
544
545 pub async fn get_from_peer_datanode_table_values(
551 &mut self,
552 ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
553 let from_peer_datanode_table_values =
554 &mut self.volatile_ctx.from_peer_datanode_table_values;
555 if from_peer_datanode_table_values.is_none() {
556 let table_ids = self.persistent_ctx.region_table_ids();
557 let datanode_table_keys = table_ids
558 .iter()
559 .map(|table_id| DatanodeTableKey {
560 datanode_id: self.persistent_ctx.from_peer.id,
561 table_id: *table_id,
562 })
563 .collect::<Vec<_>>();
564 let datanode_table_values = self
565 .table_metadata_manager
566 .datanode_table_manager()
567 .batch_get(&datanode_table_keys)
568 .await
569 .context(error::TableMetadataManagerSnafu)
570 .map_err(BoxedError::new)
571 .with_context(|_| error::RetryLaterWithSourceSnafu {
572 reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
573 })?
574 .into_iter()
575 .map(|(k, v)| (k.table_id, v))
576 .collect();
577 *from_peer_datanode_table_values = Some(datanode_table_values);
578 }
579 Ok(from_peer_datanode_table_values.as_ref().unwrap())
580 }
581
582 pub async fn get_from_peer_datanode_table_value(
588 &self,
589 table_id: TableId,
590 ) -> Result<DatanodeTableValue> {
591 let datanode_table_value = self
592 .table_metadata_manager
593 .datanode_table_manager()
594 .get(&DatanodeTableKey {
595 datanode_id: self.persistent_ctx.from_peer.id,
596 table_id,
597 })
598 .await
599 .context(error::TableMetadataManagerSnafu)
600 .map_err(BoxedError::new)
601 .with_context(|_| error::RetryLaterWithSourceSnafu {
602 reason: format!("Failed to get DatanodeTable: {table_id}"),
603 })?
604 .context(error::DatanodeTableNotFoundSnafu {
605 table_id,
606 datanode_id: self.persistent_ctx.from_peer.id,
607 })?;
608 Ok(datanode_table_value)
609 }
610
611 pub async fn register_failure_detectors(&self) {
616 let datanode_id = self.persistent_ctx.from_peer.id;
617 let region_ids = &self.persistent_ctx.region_ids;
618 let detecting_regions = region_ids
619 .iter()
620 .map(|region_id| (datanode_id, *region_id))
621 .collect::<Vec<_>>();
622 self.region_failure_detector_controller
623 .register_failure_detectors(detecting_regions)
624 .await;
625 info!(
626 "Registered failure detectors after migration failures for datanode {}, regions {:?}",
627 datanode_id, region_ids
628 );
629 }
630
631 pub async fn reset_failure_detectors_for_candidate_regions(&self) {
633 let datanode_id = self.persistent_ctx.to_peer.id;
634 let region_ids = &self.persistent_ctx.region_ids;
635 let detecting_regions = region_ids
636 .iter()
637 .map(|region_id| (datanode_id, *region_id))
638 .collect::<Vec<_>>();
639 self.region_failure_detector_controller
640 .reset_failure_detectors(detecting_regions)
641 .await;
642 info!(
643 "Reset failure detectors after migration success for datanode {}, regions {:?}",
644 datanode_id, region_ids
645 );
646 }
647
648 pub async fn deregister_failure_detectors(&self) {
653 let datanode_id = self.persistent_ctx.from_peer.id;
654 let region_ids = &self.persistent_ctx.region_ids;
655 let detecting_regions = region_ids
656 .iter()
657 .map(|region_id| (datanode_id, *region_id))
658 .collect::<Vec<_>>();
659
660 self.region_failure_detector_controller
661 .deregister_failure_detectors(detecting_regions)
662 .await;
663 }
664
665 pub async fn deregister_failure_detectors_for_candidate_regions(&self) {
670 let to_peer_id = self.persistent_ctx.to_peer.id;
671 let region_ids = &self.persistent_ctx.region_ids;
672 let detecting_regions = region_ids
673 .iter()
674 .map(|region_id| (to_peer_id, *region_id))
675 .collect::<Vec<_>>();
676
677 self.region_failure_detector_controller
678 .deregister_failure_detectors(detecting_regions)
679 .await;
680 }
681
682 pub async fn get_replay_checkpoints_with_topic_pruned_entry_ids(
684 &self,
685 region_topics: &[(RegionId, String, bool)],
686 ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
687 let topic_region_keys = region_topics
688 .iter()
689 .map(|(region_id, topic, _)| TopicRegionKey::new(*region_id, topic))
690 .collect::<Vec<_>>();
691 let topic_region_values = self
692 .table_metadata_manager
693 .topic_region_manager()
694 .batch_get(topic_region_keys)
695 .await
696 .context(error::TableMetadataManagerSnafu)?;
697
698 let topic_name_keys = region_topics
699 .iter()
700 .map(|(_, topic, _)| topic.as_str())
701 .collect::<HashSet<_>>()
702 .into_iter()
703 .map(TopicNameKey::new)
704 .collect::<Vec<_>>();
705 let topic_name_values = self
706 .table_metadata_manager
707 .topic_name_manager()
708 .batch_get(topic_name_keys)
709 .await
710 .context(error::TableMetadataManagerSnafu)?;
711 debug!(
712 "Fetched topic region values: {:?}, topic name values: {:?}",
713 topic_region_values, topic_name_values
714 );
715
716 let replay_checkpoints = region_topics
717 .iter()
718 .filter_map(|(region_id, topic, is_metric_engine)| {
719 let checkpoint = topic_region_values
720 .get(region_id)
721 .and_then(|value| value.checkpoint);
722 let pruned_entry_id = topic_name_values
723 .get(topic)
724 .map(|value| value.pruned_entry_id);
725
726 ReplayCheckpoint::merge_with_topic_pruned_entry_id(
727 checkpoint,
728 pruned_entry_id,
729 *is_metric_engine,
730 )
731 .map(|checkpoint| (*region_id, checkpoint))
732 })
733 .collect::<HashMap<_, _>>();
734
735 Ok(replay_checkpoints)
736 }
737
738 pub async fn invalidate_table_cache(&self) -> Result<()> {
740 let table_ids = self.region_table_ids();
741 let mut cache_idents = Vec::with_capacity(table_ids.len());
742 for table_id in &table_ids {
743 cache_idents.push(CacheIdent::TableId(*table_id));
744 }
745 let ctx = common_meta::cache_invalidator::Context::default();
747 let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
748 Ok(())
749 }
750
751 pub fn persistent_ctx(&self) -> PersistentContext {
753 self.persistent_ctx.clone()
754 }
755}
756
757#[async_trait::async_trait]
758#[typetag::serde(tag = "region_migration_state")]
759pub(crate) trait State: Sync + Send + Debug {
760 fn name(&self) -> &'static str {
761 let type_name = std::any::type_name::<Self>();
762 type_name.split("::").last().unwrap_or(type_name)
764 }
765
766 async fn next(
768 &mut self,
769 ctx: &mut Context,
770 procedure_ctx: &ProcedureContext,
771 ) -> Result<(Box<dyn State>, Status)>;
772
773 fn as_any(&self) -> &dyn Any;
775}
776
777#[derive(Debug, Serialize, Deserialize)]
779pub struct RegionMigrationDataOwned {
780 persistent_ctx: PersistentContext,
781 state: Box<dyn State>,
782}
783
784#[derive(Debug, Serialize)]
786pub struct RegionMigrationData<'a> {
787 persistent_ctx: &'a PersistentContext,
788 state: &'a dyn State,
789}
790
791pub(crate) struct RegionMigrationProcedure {
792 state: Box<dyn State>,
793 context: Context,
794 _guards: Vec<RegionMigrationProcedureGuard>,
795}
796
797impl RegionMigrationProcedure {
798 const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
799
800 pub fn new(
801 persistent_context: PersistentContext,
802 context_factory: impl ContextFactory,
803 guards: Vec<RegionMigrationProcedureGuard>,
804 ) -> Self {
805 let state = Box::new(RegionMigrationStart {});
806 Self::new_inner(state, persistent_context, context_factory, guards)
807 }
808
809 fn new_inner(
810 state: Box<dyn State>,
811 persistent_context: PersistentContext,
812 context_factory: impl ContextFactory,
813 guards: Vec<RegionMigrationProcedureGuard>,
814 ) -> Self {
815 Self {
816 state,
817 context: context_factory.new_context(persistent_context),
818 _guards: guards,
819 }
820 }
821
822 fn from_json(
823 json: &str,
824 context_factory: impl ContextFactory,
825 tracker: RegionMigrationProcedureTracker,
826 ) -> ProcedureResult<Self> {
827 let RegionMigrationDataOwned {
828 persistent_ctx,
829 state,
830 } = serde_json::from_str(json).context(FromJsonSnafu)?;
831 let guards = persistent_ctx
832 .region_ids
833 .iter()
834 .flat_map(|region_id| {
835 tracker.insert_running_procedure(&RegionMigrationProcedureTask {
836 region_id: *region_id,
837 from_peer: persistent_ctx.from_peer.clone(),
838 to_peer: persistent_ctx.to_peer.clone(),
839 timeout: persistent_ctx.timeout,
840 trigger_reason: persistent_ctx.trigger_reason,
841 })
842 })
843 .collect::<Vec<_>>();
844
845 let context = context_factory.new_context(persistent_ctx);
846
847 Ok(Self {
848 state,
849 context,
850 _guards: guards,
851 })
852 }
853
854 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
855 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
856 .with_label_values(&["rollback"])
857 .start_timer();
858 let ctx = &self.context;
859 let table_regions = ctx.persistent_ctx.table_regions();
860 for (table_id, regions) in table_regions {
861 let table_lock = TableLock::Write(table_id).into();
862 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
863 let table_route = ctx.get_table_route_value(table_id).await?;
864 let region_routes = table_route.region_routes().unwrap();
865 let downgraded = region_routes
866 .iter()
867 .filter(|route| regions.contains(&route.region.id))
868 .any(|route| route.is_leader_downgrading());
869 if downgraded {
870 info!(
871 "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
872 );
873 let table_metadata_manager = &ctx.table_metadata_manager;
874 table_metadata_manager
875 .update_leader_region_status(table_id, &table_route, |route| {
876 if regions.contains(&route.region.id) {
877 Some(None)
878 } else {
879 None
880 }
881 })
882 .await
883 .context(error::TableMetadataManagerSnafu)
884 .map_err(BoxedError::new)
885 .with_context(|_| error::RetryLaterWithSourceSnafu {
886 reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
887 })?;
888 }
889 }
890 self.context
891 .deregister_failure_detectors_for_candidate_regions()
892 .await;
893 self.context.register_failure_detectors().await;
894
895 Ok(())
896 }
897}
898
899#[async_trait::async_trait]
900impl Procedure for RegionMigrationProcedure {
901 fn type_name(&self) -> &str {
902 Self::TYPE_NAME
903 }
904
905 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
906 self.rollback_inner(ctx)
907 .await
908 .map_err(ProcedureError::external)
909 }
910
911 fn rollback_supported(&self) -> bool {
912 true
913 }
914
915 #[tracing::instrument(skip_all, fields(
916 state = %self.state.name(),
917 region_count = self.context.persistent_ctx.region_ids.len(),
918 from_peer = self.context.persistent_ctx.from_peer.id,
919 to_peer = self.context.persistent_ctx.to_peer.id,
920 ))]
921 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
922 let state = &mut self.state;
923
924 let name = state.name();
925 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
926 .with_label_values(&[name])
927 .start_timer();
928 match state.next(&mut self.context, ctx).await {
929 Ok((next, status)) => {
930 *state = next;
931 Ok(status)
932 }
933 Err(e) => {
934 if e.is_retryable() {
935 METRIC_META_REGION_MIGRATION_ERROR
936 .with_label_values(&[name, "retryable"])
937 .inc();
938 Err(ProcedureError::retry_later(e))
939 } else {
940 self.context.volatile_ctx.opening_region_guards.clear();
942 self.context
943 .deregister_failure_detectors_for_candidate_regions()
944 .await;
945 error!(
946 e;
947 "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
948 self.context.persistent_ctx.region_ids,
949 self.context.persistent_ctx.from_peer,
950 self.context.persistent_ctx.to_peer,
951 self.context.volatile_ctx.metrics,
952 );
953 METRIC_META_REGION_MIGRATION_ERROR
954 .with_label_values(&[name, "external"])
955 .inc();
956 Err(ProcedureError::external(e))
957 }
958 }
959 }
960 }
961
962 fn dump(&self) -> ProcedureResult<String> {
963 let data = RegionMigrationData {
964 state: self.state.as_ref(),
965 persistent_ctx: &self.context.persistent_ctx,
966 };
967 serde_json::to_string(&data).context(ToJsonSnafu)
968 }
969
970 fn lock_key(&self) -> LockKey {
971 LockKey::new(self.context.persistent_ctx.lock_key())
972 }
973
974 fn user_metadata(&self) -> Option<UserMetadata> {
975 Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
976 }
977}
978
979#[cfg(test)]
980mod tests {
981 use std::assert_matches;
982 use std::sync::Arc;
983
984 use common_meta::distributed_time_constants::default_distributed_time_constants;
985 use common_meta::instruction::Instruction;
986 use common_meta::key::test_utils::new_test_table_info;
987 use common_meta::rpc::router::{Region, RegionRoute};
988
989 use super::*;
990 use crate::handler::HeartbeatMailbox;
991 use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
992 use crate::procedure::region_migration::test_util::*;
993 use crate::procedure::test_util::{
994 new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
995 new_upgrade_region_reply,
996 };
997 use crate::service::mailbox::Channel;
998
999 fn new_persistent_context() -> PersistentContext {
1000 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
1001 }
1002
1003 #[test]
1004 fn test_lock_key() {
1005 let persistent_context = new_persistent_context();
1006 let expected_keys = persistent_context.lock_key();
1007
1008 let env = TestingEnv::new();
1009 let context = env.context_factory();
1010
1011 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
1012
1013 let key = procedure.lock_key();
1014 let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
1015
1016 for key in expected_keys {
1017 assert!(keys.contains(&key));
1018 }
1019 }
1020
1021 #[test]
1022 fn test_data_serialization() {
1023 let persistent_context = new_persistent_context();
1024
1025 let env = TestingEnv::new();
1026 let context = env.context_factory();
1027
1028 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
1029
1030 let serialized = procedure.dump().unwrap();
1031 let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
1032 assert_eq!(expected, serialized);
1033 }
1034
1035 #[test]
1036 fn test_backward_compatibility() {
1037 let persistent_ctx = PersistentContext {
1038 #[allow(deprecated)]
1039 catalog: Some("greptime".into()),
1040 #[allow(deprecated)]
1041 schema: Some("public".into()),
1042 catalog_and_schema: vec![],
1043 from_peer: Peer::empty(1),
1044 to_peer: Peer::empty(2),
1045 region_ids: vec![RegionId::new(1024, 1)],
1046 timeout: Duration::from_secs(10),
1047 trigger_reason: RegionMigrationTriggerReason::default(),
1048 };
1049 let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
1051 let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
1052
1053 assert_eq!(persistent_ctx, deserialized);
1054 }
1055
1056 #[derive(Debug, Serialize, Deserialize, Default)]
1057 pub struct MockState;
1058
1059 #[async_trait::async_trait]
1060 #[typetag::serde]
1061 impl State for MockState {
1062 async fn next(
1063 &mut self,
1064 _ctx: &mut Context,
1065 _procedure_ctx: &ProcedureContext,
1066 ) -> Result<(Box<dyn State>, Status)> {
1067 Ok((Box::new(MockState), Status::done()))
1068 }
1069
1070 fn as_any(&self) -> &dyn Any {
1071 self
1072 }
1073 }
1074
1075 #[tokio::test]
1076 async fn test_execution_after_deserialized() {
1077 let env = TestingEnv::new();
1078
1079 fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
1080 let persistent_context = new_persistent_context();
1081 let context_factory = env.context_factory();
1082 let state = Box::<MockState>::default();
1083 RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
1084 }
1085
1086 let ctx = TestingEnv::procedure_context();
1087 let mut procedure = new_mock_procedure(&env);
1088 let mut status = None;
1089 for _ in 0..3 {
1090 status = Some(procedure.execute(&ctx).await.unwrap());
1091 }
1092 assert!(status.unwrap().is_done());
1093
1094 let ctx = TestingEnv::procedure_context();
1095 let mut procedure = new_mock_procedure(&env);
1096
1097 status = Some(procedure.execute(&ctx).await.unwrap());
1098
1099 let serialized = procedure.dump().unwrap();
1100
1101 let context_factory = env.context_factory();
1102 let tracker = env.tracker();
1103 let mut procedure =
1104 RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
1105 .unwrap();
1106 for region_id in &procedure.context.persistent_ctx.region_ids {
1107 assert!(tracker.contains(*region_id));
1108 }
1109
1110 for _ in 1..3 {
1111 status = Some(procedure.execute(&ctx).await.unwrap());
1112 }
1113 assert!(status.unwrap().is_done());
1114 }
1115
1116 #[tokio::test]
1117 async fn test_broadcast_invalidate_table_cache() {
1118 let mut env = TestingEnv::new();
1119 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1120 let ctx = env.context_factory().new_context(persistent_context);
1121 let mailbox_ctx = env.mailbox_context();
1122
1123 ctx.invalidate_table_cache().await.unwrap();
1125
1126 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1127
1128 mailbox_ctx
1129 .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1130 .await;
1131
1132 ctx.invalidate_table_cache().await.unwrap();
1133
1134 let resp = rx.recv().await.unwrap().unwrap();
1135 let msg = resp.mailbox_message.unwrap();
1136
1137 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1138 assert_eq!(
1139 instruction,
1140 Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1141 );
1142 }
1143
1144 fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1145 vec![
1146 Step::next(
1148 "Should be the open candidate region",
1149 None,
1150 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1151 ),
1152 Step::next(
1154 "Should be the flush leader region",
1155 Some(mock_datanode_reply(
1156 to_peer_id,
1157 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1158 )),
1159 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1160 ),
1161 Step::next(
1163 "Should be the flush leader region",
1164 Some(mock_datanode_reply(
1165 from_peer_id,
1166 Arc::new(move |id| {
1167 Ok(new_flush_region_reply_for_region(
1168 id,
1169 RegionId::new(1024, 1),
1170 true,
1171 None,
1172 ))
1173 }),
1174 )),
1175 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1176 ),
1177 Step::next(
1179 "Should be the downgrade leader region",
1180 None,
1181 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1182 ),
1183 Step::next(
1185 "Should be the upgrade candidate region",
1186 Some(mock_datanode_reply(
1187 from_peer_id,
1188 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1189 )),
1190 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1191 ),
1192 Step::next(
1194 "Should be the update metadata for upgrading",
1195 Some(mock_datanode_reply(
1196 to_peer_id,
1197 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1198 )),
1199 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1200 ),
1201 Step::next(
1203 "Should be the close downgraded region",
1204 None,
1205 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1206 ),
1207 Step::next(
1209 "Should be the region migration end",
1210 None,
1211 Assertion::simple(assert_region_migration_end, assert_done),
1212 ),
1213 Step::next(
1215 "Should be the region migration end again",
1216 None,
1217 Assertion::simple(assert_region_migration_end, assert_done),
1218 ),
1219 ]
1220 }
1221
1222 #[tokio::test]
1223 async fn test_procedure_flow() {
1224 common_telemetry::init_default_ut_logging();
1225
1226 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1227 let state = Box::new(RegionMigrationStart);
1228
1229 let from_peer_id = persistent_context.from_peer.id;
1231 let to_peer_id = persistent_context.to_peer.id;
1232 let from_peer = persistent_context.from_peer.clone();
1233 let to_peer = persistent_context.to_peer.clone();
1234 let region_id = persistent_context.region_ids[0];
1235 let table_info = new_test_table_info(1024);
1236 let region_routes = vec![RegionRoute {
1237 region: Region::new_test(region_id),
1238 leader_peer: Some(from_peer),
1239 follower_peers: vec![to_peer],
1240 ..Default::default()
1241 }];
1242
1243 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1244 suite.init_table_metadata(table_info, region_routes).await;
1245
1246 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1247 let timer = Instant::now();
1248
1249 let runner = ProcedureMigrationSuiteRunner::new(suite)
1251 .steps(steps)
1252 .run_once()
1253 .await;
1254
1255 let region_lease = default_distributed_time_constants().region_lease.as_secs();
1256
1257 assert!(timer.elapsed().as_secs() < region_lease / 2);
1259
1260 runner.suite.verify_table_metadata().await;
1261 }
1262
1263 #[tokio::test]
1264 async fn test_procedure_flow_open_candidate_region_retryable_error() {
1265 common_telemetry::init_default_ut_logging();
1266
1267 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1268 let state = Box::new(RegionMigrationStart);
1269
1270 let to_peer_id = persistent_context.to_peer.id;
1272 let from_peer = persistent_context.from_peer.clone();
1273 let region_id = persistent_context.region_ids[0];
1274 let table_info = new_test_table_info(1024);
1275 let region_routes = vec![RegionRoute {
1276 region: Region::new_test(region_id),
1277 leader_peer: Some(from_peer),
1278 follower_peers: vec![],
1279 ..Default::default()
1280 }];
1281
1282 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1283 suite.init_table_metadata(table_info, region_routes).await;
1284
1285 let steps = vec![
1286 Step::next(
1288 "Should be the open candidate region",
1289 None,
1290 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1291 ),
1292 Step::next(
1294 "Should be throwing a non-retry error",
1295 Some(mock_datanode_reply(
1296 to_peer_id,
1297 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1298 )),
1299 Assertion::error(|error| assert!(error.is_retryable())),
1300 ),
1301 Step::next(
1303 "Should be throwing a non-retry error again",
1304 Some(mock_datanode_reply(
1305 to_peer_id,
1306 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1307 )),
1308 Assertion::error(|error| assert!(error.is_retryable())),
1309 ),
1310 ];
1311
1312 let setup_to_latest_persisted_state = Step::setup(
1313 "Sets state to UpdateMetadata::Downgrade",
1314 merge_before_test_fn(vec![
1315 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1316 Arc::new(reset_volatile_ctx),
1317 ]),
1318 );
1319
1320 let steps = [
1321 steps.clone(),
1322 vec![setup_to_latest_persisted_state.clone()],
1324 steps.clone()[1..].to_vec(),
1325 vec![setup_to_latest_persisted_state],
1326 steps.clone()[1..].to_vec(),
1327 ]
1328 .concat();
1329
1330 let runner = ProcedureMigrationSuiteRunner::new(suite)
1332 .steps(steps.clone())
1333 .run_once()
1334 .await;
1335
1336 let table_routes_version = runner
1337 .env()
1338 .table_metadata_manager()
1339 .table_route_manager()
1340 .table_route_storage()
1341 .get(region_id.table_id())
1342 .await
1343 .unwrap()
1344 .unwrap()
1345 .version();
1346 assert_eq!(table_routes_version.unwrap(), 0);
1348 }
1349
1350 #[tokio::test]
1351 async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1352 common_telemetry::init_default_ut_logging();
1353
1354 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1355 let state = Box::new(RegionMigrationStart);
1356
1357 let from_peer_id = persistent_context.from_peer.id;
1359 let to_peer_id = persistent_context.to_peer.id;
1360 let from_peer = persistent_context.from_peer.clone();
1361 let region_id = persistent_context.region_ids[0];
1362 let table_info = new_test_table_info(1024);
1363 let region_routes = vec![RegionRoute {
1364 region: Region::new_test(region_id),
1365 leader_peer: Some(from_peer),
1366 follower_peers: vec![],
1367 ..Default::default()
1368 }];
1369
1370 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1371 suite.init_table_metadata(table_info, region_routes).await;
1372
1373 let steps = vec![
1374 Step::next(
1376 "Should be the open candidate region",
1377 None,
1378 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1379 ),
1380 Step::next(
1382 "Should be the flush leader region",
1383 Some(mock_datanode_reply(
1384 to_peer_id,
1385 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1386 )),
1387 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1388 ),
1389 Step::next(
1391 "Should be the flush leader region",
1392 Some(mock_datanode_reply(
1393 from_peer_id,
1394 Arc::new(move |id| {
1395 Ok(new_flush_region_reply_for_region(
1396 id,
1397 RegionId::new(1024, 1),
1398 true,
1399 None,
1400 ))
1401 }),
1402 )),
1403 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1404 ),
1405 Step::next(
1407 "Should be the downgrade leader region",
1408 None,
1409 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1410 ),
1411 Step::next(
1413 "Should be the upgrade candidate region",
1414 Some(mock_datanode_reply(
1415 from_peer_id,
1416 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1417 )),
1418 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1419 ),
1420 Step::next(
1422 "Should be the rollback metadata",
1423 Some(mock_datanode_reply(
1424 to_peer_id,
1425 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1426 )),
1427 Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1428 ),
1429 Step::next(
1431 "Should be the region migration abort",
1432 None,
1433 Assertion::simple(assert_region_migration_abort, assert_no_persist),
1434 ),
1435 Step::next(
1437 "Should throw an error",
1438 None,
1439 Assertion::error(|error| {
1440 assert!(!error.is_retryable());
1441 assert_matches!(error, error::Error::MigrationAbort { .. });
1442 }),
1443 ),
1444 ];
1445
1446 let setup_to_latest_persisted_state = Step::setup(
1447 "Sets state to OpenCandidateRegion",
1448 merge_before_test_fn(vec![
1449 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1450 Arc::new(reset_volatile_ctx),
1451 ]),
1452 );
1453
1454 let steps = [
1455 steps.clone(),
1456 vec![setup_to_latest_persisted_state.clone()],
1457 steps.clone()[1..].to_vec(),
1458 vec![setup_to_latest_persisted_state],
1459 steps.clone()[1..].to_vec(),
1460 ]
1461 .concat();
1462
1463 ProcedureMigrationSuiteRunner::new(suite)
1465 .steps(steps.clone())
1466 .run_once()
1467 .await;
1468 }
1469
1470 #[tokio::test]
1471 async fn test_procedure_flow_upgrade_candidate_with_retry() {
1472 common_telemetry::init_default_ut_logging();
1473
1474 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1475 let state = Box::new(RegionMigrationStart);
1476
1477 let to_peer_id = persistent_context.to_peer.id;
1479 let from_peer_id = persistent_context.from_peer.id;
1480 let from_peer = persistent_context.from_peer.clone();
1481 let region_id = persistent_context.region_ids[0];
1482 let table_info = new_test_table_info(1024);
1483 let region_routes = vec![RegionRoute {
1484 region: Region::new_test(region_id),
1485 leader_peer: Some(from_peer),
1486 follower_peers: vec![],
1487 ..Default::default()
1488 }];
1489
1490 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1491 suite.init_table_metadata(table_info, region_routes).await;
1492
1493 let steps = vec![
1494 Step::next(
1496 "Should be the open candidate region",
1497 None,
1498 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1499 ),
1500 Step::next(
1502 "Should be throwing a retryable error",
1503 Some(mock_datanode_reply(
1504 to_peer_id,
1505 Arc::new(|id| {
1506 Ok(new_open_region_reply(
1507 id,
1508 false,
1509 Some("mock retryable open region error".to_string()),
1510 ))
1511 }),
1512 )),
1513 Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1514 ),
1515 Step::next(
1517 "Should be the update metadata for downgrading",
1518 Some(mock_datanode_reply(
1519 to_peer_id,
1520 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1521 )),
1522 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1523 ),
1524 Step::next(
1526 "Should be the flush leader region",
1527 Some(mock_datanode_reply(
1528 from_peer_id,
1529 Arc::new(move |id| {
1530 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1531 }),
1532 )),
1533 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1534 ),
1535 Step::next(
1537 "Should be the downgrade leader region",
1538 None,
1539 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1540 ),
1541 Step::next(
1543 "Should be the upgrade candidate region",
1544 Some(mock_datanode_reply(
1545 from_peer_id,
1546 merge_mailbox_messages(vec![
1547 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1548 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1549 ]),
1550 )),
1551 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1552 ),
1553 Step::next(
1555 "Should be the update metadata for upgrading",
1556 Some(mock_datanode_reply(
1557 to_peer_id,
1558 merge_mailbox_messages(vec![
1559 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1560 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1561 ]),
1562 )),
1563 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1564 ),
1565 Step::next(
1567 "Should be the close downgraded region",
1568 None,
1569 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1570 ),
1571 Step::next(
1573 "Should be the region migration end",
1574 None,
1575 Assertion::simple(assert_region_migration_end, assert_done),
1576 ),
1577 Step::next(
1579 "Should be the region migration end again",
1580 None,
1581 Assertion::simple(assert_region_migration_end, assert_done),
1582 ),
1583 Step::setup(
1585 "Sets state to RegionMigrationStart",
1586 merge_before_test_fn(vec![
1587 setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1588 Arc::new(reset_volatile_ctx),
1589 ]),
1590 ),
1591 Step::next(
1595 "Should be the region migration end(has been migrated)",
1596 None,
1597 Assertion::simple(assert_region_migration_end, assert_done),
1598 ),
1599 ];
1600
1601 let steps = [steps.clone()].concat();
1602 let timer = Instant::now();
1603
1604 let runner = ProcedureMigrationSuiteRunner::new(suite)
1606 .steps(steps.clone())
1607 .run_once()
1608 .await;
1609
1610 let region_lease = default_distributed_time_constants().region_lease.as_secs();
1611 assert!(timer.elapsed().as_secs() < region_lease);
1613 runner.suite.verify_table_metadata().await;
1614 }
1615}