1pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27pub(crate) mod utils;
28
29use std::any::Any;
30use std::collections::{HashMap, HashSet};
31use std::fmt::{Debug, Display};
32use std::sync::Arc;
33use std::time::Duration;
34
35use common_error::ext::BoxedError;
36use common_event_recorder::{Event, Eventable};
37use common_meta::cache_invalidator::CacheInvalidatorRef;
38use common_meta::ddl::RegionFailureDetectorControllerRef;
39use common_meta::instruction::CacheIdent;
40use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
41use common_meta::key::table_route::TableRouteValue;
42use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
43use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
44use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
45use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
46use common_meta::peer::Peer;
47use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
48use common_procedure::error::{
49 Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
50};
51use common_procedure::{
52 Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
53};
54use common_telemetry::{error, info};
55use manager::RegionMigrationProcedureGuard;
56pub use manager::{
57 RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
58 RegionMigrationTriggerReason,
59};
60use serde::{Deserialize, Deserializer, Serialize};
61use snafu::{OptionExt, ResultExt};
62use store_api::storage::{RegionId, TableId};
63use tokio::time::Instant;
64
65use self::migration_start::RegionMigrationStart;
66use crate::error::{self, Result};
67use crate::events::region_migration_event::RegionMigrationEvent;
68use crate::metrics::{
69 METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
70 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
71};
72use crate::service::mailbox::MailboxRef;
73
74pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
76
77#[derive(Debug, Deserialize)]
78#[serde(untagged)]
79enum SingleOrMultiple<T> {
80 Single(T),
81 Multiple(Vec<T>),
82}
83
84fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
85where
86 D: Deserializer<'de>,
87 T: Deserialize<'de>,
88{
89 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
90 Ok(match helper {
91 SingleOrMultiple::Single(x) => vec![x],
92 SingleOrMultiple::Multiple(xs) => xs,
93 })
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct PersistentContext {
103 #[deprecated(note = "use `catalog_and_schema` instead")]
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub(crate) catalog: Option<String>,
107 #[deprecated(note = "use `catalog_and_schema` instead")]
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub(crate) schema: Option<String>,
111 #[serde(default, skip_serializing_if = "Vec::is_empty")]
113 pub(crate) catalog_and_schema: Vec<(String, String)>,
114 pub(crate) from_peer: Peer,
116 pub(crate) to_peer: Peer,
118 #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
120 pub(crate) region_ids: Vec<RegionId>,
121 #[serde(with = "humantime_serde", default = "default_timeout")]
123 pub(crate) timeout: Duration,
124 #[serde(default)]
126 pub(crate) trigger_reason: RegionMigrationTriggerReason,
127}
128
129impl PersistentContext {
130 pub fn new(
131 catalog_and_schema: Vec<(String, String)>,
132 from_peer: Peer,
133 to_peer: Peer,
134 region_ids: Vec<RegionId>,
135 timeout: Duration,
136 trigger_reason: RegionMigrationTriggerReason,
137 ) -> Self {
138 #[allow(deprecated)]
139 Self {
140 catalog: None,
141 schema: None,
142 catalog_and_schema,
143 from_peer,
144 to_peer,
145 region_ids,
146 timeout,
147 trigger_reason,
148 }
149 }
150}
151
152fn default_timeout() -> Duration {
153 Duration::from_secs(10)
154}
155
156impl PersistentContext {
157 pub fn lock_key(&self) -> Vec<StringKey> {
158 let mut lock_keys =
159 Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
160 #[allow(deprecated)]
161 if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
162 lock_keys.push(CatalogLock::Read(catalog).into());
163 lock_keys.push(SchemaLock::read(catalog, schema).into());
164 }
165 for (catalog, schema) in self.catalog_and_schema.iter() {
166 lock_keys.push(CatalogLock::Read(catalog).into());
167 lock_keys.push(SchemaLock::read(catalog, schema).into());
168 }
169
170 let mut region_ids = self.region_ids.clone();
172 region_ids.sort_unstable();
173 for region_id in region_ids {
174 lock_keys.push(RegionLock::Write(region_id).into());
175 }
176 lock_keys
177 }
178
179 pub fn region_table_ids(&self) -> Vec<TableId> {
183 self.region_ids
184 .iter()
185 .map(|region_id| region_id.table_id())
186 .collect::<HashSet<_>>()
187 .into_iter()
188 .collect()
189 }
190
191 pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
195 let mut table_regions = HashMap::new();
196 for region_id in &self.region_ids {
197 table_regions
198 .entry(region_id.table_id())
199 .or_insert_with(Vec::new)
200 .push(*region_id);
201 }
202 table_regions
203 }
204}
205
206impl Eventable for PersistentContext {
207 fn to_event(&self) -> Option<Box<dyn Event>> {
208 Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
209 }
210}
211
212#[derive(Debug, Clone, Default)]
214pub struct Metrics {
215 operations_elapsed: Duration,
217 flush_leader_region_elapsed: Duration,
219 downgrade_leader_region_elapsed: Duration,
221 open_candidate_region_elapsed: Duration,
223 upgrade_candidate_region_elapsed: Duration,
225}
226
227impl Display for Metrics {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 let total = self.flush_leader_region_elapsed
230 + self.downgrade_leader_region_elapsed
231 + self.open_candidate_region_elapsed
232 + self.upgrade_candidate_region_elapsed;
233 write!(
234 f,
235 "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
236 total,
237 self.flush_leader_region_elapsed,
238 self.downgrade_leader_region_elapsed,
239 self.open_candidate_region_elapsed,
240 self.upgrade_candidate_region_elapsed
241 )
242 }
243}
244
245impl Metrics {
246 pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
248 self.operations_elapsed += elapsed;
249 }
250
251 pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
253 self.flush_leader_region_elapsed += elapsed;
254 }
255
256 pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
258 self.downgrade_leader_region_elapsed += elapsed;
259 }
260
261 pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
263 self.open_candidate_region_elapsed += elapsed;
264 }
265
266 pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
268 self.upgrade_candidate_region_elapsed += elapsed;
269 }
270}
271
272impl Drop for Metrics {
273 fn drop(&mut self) {
274 let total = self.flush_leader_region_elapsed
275 + self.downgrade_leader_region_elapsed
276 + self.open_candidate_region_elapsed
277 + self.upgrade_candidate_region_elapsed;
278 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
279 .with_label_values(&["total"])
280 .observe(total.as_secs_f64());
281
282 if !self.flush_leader_region_elapsed.is_zero() {
283 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
284 .with_label_values(&["flush_leader_region"])
285 .observe(self.flush_leader_region_elapsed.as_secs_f64());
286 }
287
288 if !self.downgrade_leader_region_elapsed.is_zero() {
289 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
290 .with_label_values(&["downgrade_leader_region"])
291 .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
292 }
293
294 if !self.open_candidate_region_elapsed.is_zero() {
295 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
296 .with_label_values(&["open_candidate_region"])
297 .observe(self.open_candidate_region_elapsed.as_secs_f64());
298 }
299
300 if !self.upgrade_candidate_region_elapsed.is_zero() {
301 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
302 .with_label_values(&["upgrade_candidate_region"])
303 .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
304 }
305 }
306}
307
308#[derive(Debug, Clone, Default)]
314pub struct VolatileContext {
315 opening_region_guards: Vec<OperatingRegionGuard>,
322 leader_region_lease_deadline: Option<Instant>,
324 from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
326 leader_region_last_entry_ids: HashMap<RegionId, u64>,
328 leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
330 metrics: Metrics,
332}
333
334impl VolatileContext {
335 pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
337 if self.leader_region_lease_deadline.is_none() {
338 self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
339 }
340 }
341
342 pub fn reset_leader_region_lease_deadline(&mut self) {
344 self.leader_region_lease_deadline = None;
345 }
346
347 pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
349 self.leader_region_last_entry_ids
350 .insert(region_id, last_entry_id);
351 }
352
353 pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
355 self.leader_region_metadata_last_entry_ids
356 .insert(region_id, last_entry_id);
357 }
358}
359
360pub trait ContextFactory {
362 fn new_context(self, persistent_ctx: PersistentContext) -> Context;
363}
364
365#[derive(Clone)]
367pub struct DefaultContextFactory {
368 volatile_ctx: VolatileContext,
369 in_memory_key: ResettableKvBackendRef,
370 table_metadata_manager: TableMetadataManagerRef,
371 opening_region_keeper: MemoryRegionKeeperRef,
372 region_failure_detector_controller: RegionFailureDetectorControllerRef,
373 mailbox: MailboxRef,
374 server_addr: String,
375 cache_invalidator: CacheInvalidatorRef,
376}
377
378impl DefaultContextFactory {
379 pub fn new(
381 in_memory_key: ResettableKvBackendRef,
382 table_metadata_manager: TableMetadataManagerRef,
383 opening_region_keeper: MemoryRegionKeeperRef,
384 region_failure_detector_controller: RegionFailureDetectorControllerRef,
385 mailbox: MailboxRef,
386 server_addr: String,
387 cache_invalidator: CacheInvalidatorRef,
388 ) -> Self {
389 Self {
390 volatile_ctx: VolatileContext::default(),
391 in_memory_key,
392 table_metadata_manager,
393 opening_region_keeper,
394 region_failure_detector_controller,
395 mailbox,
396 server_addr,
397 cache_invalidator,
398 }
399 }
400}
401
402impl ContextFactory for DefaultContextFactory {
403 fn new_context(self, persistent_ctx: PersistentContext) -> Context {
404 Context {
405 persistent_ctx,
406 volatile_ctx: self.volatile_ctx,
407 in_memory: self.in_memory_key,
408 table_metadata_manager: self.table_metadata_manager,
409 opening_region_keeper: self.opening_region_keeper,
410 region_failure_detector_controller: self.region_failure_detector_controller,
411 mailbox: self.mailbox,
412 server_addr: self.server_addr,
413 cache_invalidator: self.cache_invalidator,
414 }
415 }
416}
417
418pub struct Context {
420 persistent_ctx: PersistentContext,
421 volatile_ctx: VolatileContext,
422 in_memory: KvBackendRef,
423 table_metadata_manager: TableMetadataManagerRef,
424 opening_region_keeper: MemoryRegionKeeperRef,
425 region_failure_detector_controller: RegionFailureDetectorControllerRef,
426 mailbox: MailboxRef,
427 server_addr: String,
428 cache_invalidator: CacheInvalidatorRef,
429}
430
431impl Context {
432 pub fn next_operation_timeout(&self) -> Option<Duration> {
434 self.persistent_ctx
435 .timeout
436 .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
437 }
438
439 pub fn update_operations_elapsed(&mut self, instant: Instant) {
441 self.volatile_ctx
442 .metrics
443 .update_operations_elapsed(instant.elapsed());
444 }
445
446 pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
448 self.volatile_ctx
449 .metrics
450 .update_flush_leader_region_elapsed(instant.elapsed());
451 }
452
453 pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
455 self.volatile_ctx
456 .metrics
457 .update_downgrade_leader_region_elapsed(instant.elapsed());
458 }
459
460 pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
462 self.volatile_ctx
463 .metrics
464 .update_open_candidate_region_elapsed(instant.elapsed());
465 }
466
467 pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
469 self.volatile_ctx
470 .metrics
471 .update_upgrade_candidate_region_elapsed(instant.elapsed());
472 }
473
474 pub fn server_addr(&self) -> &str {
476 &self.server_addr
477 }
478
479 pub fn region_table_ids(&self) -> Vec<TableId> {
481 self.persistent_ctx
482 .region_ids
483 .iter()
484 .map(|region_id| region_id.table_id())
485 .collect::<HashSet<_>>()
486 .into_iter()
487 .collect()
488 }
489
490 pub async fn get_table_route_values(
496 &self,
497 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
498 let table_ids = self.persistent_ctx.region_table_ids();
499 let table_routes = self
500 .table_metadata_manager
501 .table_route_manager()
502 .table_route_storage()
503 .batch_get_with_raw_bytes(&table_ids)
504 .await
505 .context(error::TableMetadataManagerSnafu)
506 .map_err(BoxedError::new)
507 .with_context(|_| error::RetryLaterWithSourceSnafu {
508 reason: format!("Failed to get table routes: {table_ids:?}"),
509 })?;
510 let table_routes = table_ids
511 .into_iter()
512 .zip(table_routes)
513 .filter_map(|(table_id, table_route)| {
514 table_route.map(|table_route| (table_id, table_route))
515 })
516 .collect::<HashMap<_, _>>();
517 Ok(table_routes)
518 }
519
520 pub async fn get_table_route_value(
526 &self,
527 table_id: TableId,
528 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
529 let table_route_value = self
530 .table_metadata_manager
531 .table_route_manager()
532 .table_route_storage()
533 .get_with_raw_bytes(table_id)
534 .await
535 .context(error::TableMetadataManagerSnafu)
536 .map_err(BoxedError::new)
537 .with_context(|_| error::RetryLaterWithSourceSnafu {
538 reason: format!("Failed to get table routes: {table_id:}"),
539 })?
540 .context(error::TableRouteNotFoundSnafu { table_id })?;
541 Ok(table_route_value)
542 }
543
544 pub async fn get_from_peer_datanode_table_values(
550 &mut self,
551 ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
552 let from_peer_datanode_table_values =
553 &mut self.volatile_ctx.from_peer_datanode_table_values;
554 if from_peer_datanode_table_values.is_none() {
555 let table_ids = self.persistent_ctx.region_table_ids();
556 let datanode_table_keys = table_ids
557 .iter()
558 .map(|table_id| DatanodeTableKey {
559 datanode_id: self.persistent_ctx.from_peer.id,
560 table_id: *table_id,
561 })
562 .collect::<Vec<_>>();
563 let datanode_table_values = self
564 .table_metadata_manager
565 .datanode_table_manager()
566 .batch_get(&datanode_table_keys)
567 .await
568 .context(error::TableMetadataManagerSnafu)
569 .map_err(BoxedError::new)
570 .with_context(|_| error::RetryLaterWithSourceSnafu {
571 reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
572 })?
573 .into_iter()
574 .map(|(k, v)| (k.table_id, v))
575 .collect();
576 *from_peer_datanode_table_values = Some(datanode_table_values);
577 }
578 Ok(from_peer_datanode_table_values.as_ref().unwrap())
579 }
580
581 pub async fn get_from_peer_datanode_table_value(
587 &self,
588 table_id: TableId,
589 ) -> Result<DatanodeTableValue> {
590 let datanode_table_value = self
591 .table_metadata_manager
592 .datanode_table_manager()
593 .get(&DatanodeTableKey {
594 datanode_id: self.persistent_ctx.from_peer.id,
595 table_id,
596 })
597 .await
598 .context(error::TableMetadataManagerSnafu)
599 .map_err(BoxedError::new)
600 .with_context(|_| error::RetryLaterWithSourceSnafu {
601 reason: format!("Failed to get DatanodeTable: {table_id}"),
602 })?
603 .context(error::DatanodeTableNotFoundSnafu {
604 table_id,
605 datanode_id: self.persistent_ctx.from_peer.id,
606 })?;
607 Ok(datanode_table_value)
608 }
609
610 pub async fn register_failure_detectors(&self) {
615 let datanode_id = self.persistent_ctx.from_peer.id;
616 let region_ids = &self.persistent_ctx.region_ids;
617 let detecting_regions = region_ids
618 .iter()
619 .map(|region_id| (datanode_id, *region_id))
620 .collect::<Vec<_>>();
621 self.region_failure_detector_controller
622 .register_failure_detectors(detecting_regions)
623 .await;
624 info!(
625 "Registered failure detectors after migration failures for datanode {}, regions {:?}",
626 datanode_id, region_ids
627 );
628 }
629
630 pub async fn deregister_failure_detectors(&self) {
635 let datanode_id = self.persistent_ctx.from_peer.id;
636 let region_ids = &self.persistent_ctx.region_ids;
637 let detecting_regions = region_ids
638 .iter()
639 .map(|region_id| (datanode_id, *region_id))
640 .collect::<Vec<_>>();
641
642 self.region_failure_detector_controller
643 .deregister_failure_detectors(detecting_regions)
644 .await;
645 }
646
647 pub async fn deregister_failure_detectors_for_candidate_region(&self) {
652 let to_peer_id = self.persistent_ctx.to_peer.id;
653 let region_ids = &self.persistent_ctx.region_ids;
654 let detecting_regions = region_ids
655 .iter()
656 .map(|region_id| (to_peer_id, *region_id))
657 .collect::<Vec<_>>();
658
659 self.region_failure_detector_controller
660 .deregister_failure_detectors(detecting_regions)
661 .await;
662 }
663
664 pub async fn get_replay_checkpoints(
666 &self,
667 topic_region_keys: Vec<TopicRegionKey<'_>>,
668 ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
669 let topic_region_values = self
670 .table_metadata_manager
671 .topic_region_manager()
672 .batch_get(topic_region_keys)
673 .await
674 .context(error::TableMetadataManagerSnafu)?;
675
676 let replay_checkpoints = topic_region_values
677 .into_iter()
678 .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
679 .collect::<HashMap<_, _>>();
680
681 Ok(replay_checkpoints)
682 }
683
684 pub async fn invalidate_table_cache(&self) -> Result<()> {
686 let table_ids = self.region_table_ids();
687 let mut cache_idents = Vec::with_capacity(table_ids.len());
688 for table_id in &table_ids {
689 cache_idents.push(CacheIdent::TableId(*table_id));
690 }
691 let ctx = common_meta::cache_invalidator::Context::default();
693 let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
694 Ok(())
695 }
696
697 pub fn persistent_ctx(&self) -> PersistentContext {
699 self.persistent_ctx.clone()
700 }
701}
702
703#[async_trait::async_trait]
704#[typetag::serde(tag = "region_migration_state")]
705pub(crate) trait State: Sync + Send + Debug {
706 fn name(&self) -> &'static str {
707 let type_name = std::any::type_name::<Self>();
708 type_name.split("::").last().unwrap_or(type_name)
710 }
711
712 async fn next(
714 &mut self,
715 ctx: &mut Context,
716 procedure_ctx: &ProcedureContext,
717 ) -> Result<(Box<dyn State>, Status)>;
718
719 fn as_any(&self) -> &dyn Any;
721}
722
723#[derive(Debug, Serialize, Deserialize)]
725pub struct RegionMigrationDataOwned {
726 persistent_ctx: PersistentContext,
727 state: Box<dyn State>,
728}
729
730#[derive(Debug, Serialize)]
732pub struct RegionMigrationData<'a> {
733 persistent_ctx: &'a PersistentContext,
734 state: &'a dyn State,
735}
736
737pub(crate) struct RegionMigrationProcedure {
738 state: Box<dyn State>,
739 context: Context,
740 _guards: Vec<RegionMigrationProcedureGuard>,
741}
742
743impl RegionMigrationProcedure {
744 const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
745
746 pub fn new(
747 persistent_context: PersistentContext,
748 context_factory: impl ContextFactory,
749 guards: Vec<RegionMigrationProcedureGuard>,
750 ) -> Self {
751 let state = Box::new(RegionMigrationStart {});
752 Self::new_inner(state, persistent_context, context_factory, guards)
753 }
754
755 fn new_inner(
756 state: Box<dyn State>,
757 persistent_context: PersistentContext,
758 context_factory: impl ContextFactory,
759 guards: Vec<RegionMigrationProcedureGuard>,
760 ) -> Self {
761 Self {
762 state,
763 context: context_factory.new_context(persistent_context),
764 _guards: guards,
765 }
766 }
767
768 fn from_json(
769 json: &str,
770 context_factory: impl ContextFactory,
771 tracker: RegionMigrationProcedureTracker,
772 ) -> ProcedureResult<Self> {
773 let RegionMigrationDataOwned {
774 persistent_ctx,
775 state,
776 } = serde_json::from_str(json).context(FromJsonSnafu)?;
777 let guards = persistent_ctx
778 .region_ids
779 .iter()
780 .flat_map(|region_id| {
781 tracker.insert_running_procedure(&RegionMigrationProcedureTask {
782 region_id: *region_id,
783 from_peer: persistent_ctx.from_peer.clone(),
784 to_peer: persistent_ctx.to_peer.clone(),
785 timeout: persistent_ctx.timeout,
786 trigger_reason: persistent_ctx.trigger_reason,
787 })
788 })
789 .collect::<Vec<_>>();
790
791 let context = context_factory.new_context(persistent_ctx);
792
793 Ok(Self {
794 state,
795 context,
796 _guards: guards,
797 })
798 }
799
800 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
801 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
802 .with_label_values(&["rollback"])
803 .start_timer();
804 let ctx = &self.context;
805 let table_regions = ctx.persistent_ctx.table_regions();
806 for (table_id, regions) in table_regions {
807 let table_lock = TableLock::Write(table_id).into();
808 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
809 let table_route = ctx.get_table_route_value(table_id).await?;
810 let region_routes = table_route.region_routes().unwrap();
811 let downgraded = region_routes
812 .iter()
813 .filter(|route| regions.contains(&route.region.id))
814 .any(|route| route.is_leader_downgrading());
815 if downgraded {
816 info!(
817 "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
818 );
819 let table_metadata_manager = &ctx.table_metadata_manager;
820 table_metadata_manager
821 .update_leader_region_status(table_id, &table_route, |route| {
822 if regions.contains(&route.region.id) {
823 Some(None)
824 } else {
825 None
826 }
827 })
828 .await
829 .context(error::TableMetadataManagerSnafu)
830 .map_err(BoxedError::new)
831 .with_context(|_| error::RetryLaterWithSourceSnafu {
832 reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
833 })?;
834 }
835 }
836 self.context
837 .deregister_failure_detectors_for_candidate_region()
838 .await;
839 self.context.register_failure_detectors().await;
840
841 Ok(())
842 }
843}
844
845#[async_trait::async_trait]
846impl Procedure for RegionMigrationProcedure {
847 fn type_name(&self) -> &str {
848 Self::TYPE_NAME
849 }
850
851 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
852 self.rollback_inner(ctx)
853 .await
854 .map_err(ProcedureError::external)
855 }
856
857 fn rollback_supported(&self) -> bool {
858 true
859 }
860
861 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
862 let state = &mut self.state;
863
864 let name = state.name();
865 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
866 .with_label_values(&[name])
867 .start_timer();
868 match state.next(&mut self.context, ctx).await {
869 Ok((next, status)) => {
870 *state = next;
871 Ok(status)
872 }
873 Err(e) => {
874 if e.is_retryable() {
875 METRIC_META_REGION_MIGRATION_ERROR
876 .with_label_values(&[name, "retryable"])
877 .inc();
878 Err(ProcedureError::retry_later(e))
879 } else {
880 self.context.volatile_ctx.opening_region_guards.clear();
882 self.context
883 .deregister_failure_detectors_for_candidate_region()
884 .await;
885 error!(
886 e;
887 "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
888 self.context.persistent_ctx.region_ids,
889 self.context.persistent_ctx.from_peer,
890 self.context.persistent_ctx.to_peer,
891 self.context.volatile_ctx.metrics,
892 );
893 METRIC_META_REGION_MIGRATION_ERROR
894 .with_label_values(&[name, "external"])
895 .inc();
896 Err(ProcedureError::external(e))
897 }
898 }
899 }
900 }
901
902 fn dump(&self) -> ProcedureResult<String> {
903 let data = RegionMigrationData {
904 state: self.state.as_ref(),
905 persistent_ctx: &self.context.persistent_ctx,
906 };
907 serde_json::to_string(&data).context(ToJsonSnafu)
908 }
909
910 fn lock_key(&self) -> LockKey {
911 LockKey::new(self.context.persistent_ctx.lock_key())
912 }
913
914 fn user_metadata(&self) -> Option<UserMetadata> {
915 Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::assert_matches::assert_matches;
922 use std::sync::Arc;
923
924 use common_meta::distributed_time_constants::REGION_LEASE_SECS;
925 use common_meta::instruction::Instruction;
926 use common_meta::key::test_utils::new_test_table_info;
927 use common_meta::rpc::router::{Region, RegionRoute};
928
929 use super::*;
930 use crate::handler::HeartbeatMailbox;
931 use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
932 use crate::procedure::region_migration::test_util::*;
933 use crate::procedure::test_util::{
934 new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
935 new_upgrade_region_reply,
936 };
937 use crate::service::mailbox::Channel;
938
939 fn new_persistent_context() -> PersistentContext {
940 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
941 }
942
943 #[test]
944 fn test_lock_key() {
945 let persistent_context = new_persistent_context();
946 let expected_keys = persistent_context.lock_key();
947
948 let env = TestingEnv::new();
949 let context = env.context_factory();
950
951 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
952
953 let key = procedure.lock_key();
954 let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
955
956 for key in expected_keys {
957 assert!(keys.contains(&key));
958 }
959 }
960
961 #[test]
962 fn test_data_serialization() {
963 let persistent_context = new_persistent_context();
964
965 let env = TestingEnv::new();
966 let context = env.context_factory();
967
968 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
969
970 let serialized = procedure.dump().unwrap();
971 let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
972 assert_eq!(expected, serialized);
973 }
974
975 #[test]
976 fn test_backward_compatibility() {
977 let persistent_ctx = PersistentContext {
978 #[allow(deprecated)]
979 catalog: Some("greptime".into()),
980 #[allow(deprecated)]
981 schema: Some("public".into()),
982 catalog_and_schema: vec![],
983 from_peer: Peer::empty(1),
984 to_peer: Peer::empty(2),
985 region_ids: vec![RegionId::new(1024, 1)],
986 timeout: Duration::from_secs(10),
987 trigger_reason: RegionMigrationTriggerReason::default(),
988 };
989 let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
991 let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
992
993 assert_eq!(persistent_ctx, deserialized);
994 }
995
996 #[derive(Debug, Serialize, Deserialize, Default)]
997 pub struct MockState;
998
999 #[async_trait::async_trait]
1000 #[typetag::serde]
1001 impl State for MockState {
1002 async fn next(
1003 &mut self,
1004 _ctx: &mut Context,
1005 _procedure_ctx: &ProcedureContext,
1006 ) -> Result<(Box<dyn State>, Status)> {
1007 Ok((Box::new(MockState), Status::done()))
1008 }
1009
1010 fn as_any(&self) -> &dyn Any {
1011 self
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn test_execution_after_deserialized() {
1017 let env = TestingEnv::new();
1018
1019 fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
1020 let persistent_context = new_persistent_context();
1021 let context_factory = env.context_factory();
1022 let state = Box::<MockState>::default();
1023 RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
1024 }
1025
1026 let ctx = TestingEnv::procedure_context();
1027 let mut procedure = new_mock_procedure(&env);
1028 let mut status = None;
1029 for _ in 0..3 {
1030 status = Some(procedure.execute(&ctx).await.unwrap());
1031 }
1032 assert!(status.unwrap().is_done());
1033
1034 let ctx = TestingEnv::procedure_context();
1035 let mut procedure = new_mock_procedure(&env);
1036
1037 status = Some(procedure.execute(&ctx).await.unwrap());
1038
1039 let serialized = procedure.dump().unwrap();
1040
1041 let context_factory = env.context_factory();
1042 let tracker = env.tracker();
1043 let mut procedure =
1044 RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
1045 .unwrap();
1046 for region_id in &procedure.context.persistent_ctx.region_ids {
1047 assert!(tracker.contains(*region_id));
1048 }
1049
1050 for _ in 1..3 {
1051 status = Some(procedure.execute(&ctx).await.unwrap());
1052 }
1053 assert!(status.unwrap().is_done());
1054 }
1055
1056 #[tokio::test]
1057 async fn test_broadcast_invalidate_table_cache() {
1058 let mut env = TestingEnv::new();
1059 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1060 let ctx = env.context_factory().new_context(persistent_context);
1061 let mailbox_ctx = env.mailbox_context();
1062
1063 ctx.invalidate_table_cache().await.unwrap();
1065
1066 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1067
1068 mailbox_ctx
1069 .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1070 .await;
1071
1072 ctx.invalidate_table_cache().await.unwrap();
1073
1074 let resp = rx.recv().await.unwrap().unwrap();
1075 let msg = resp.mailbox_message.unwrap();
1076
1077 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1078 assert_eq!(
1079 instruction,
1080 Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1081 );
1082 }
1083
1084 fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1085 vec![
1086 Step::next(
1088 "Should be the open candidate region",
1089 None,
1090 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1091 ),
1092 Step::next(
1094 "Should be the flush leader region",
1095 Some(mock_datanode_reply(
1096 to_peer_id,
1097 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1098 )),
1099 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1100 ),
1101 Step::next(
1103 "Should be the flush leader region",
1104 Some(mock_datanode_reply(
1105 from_peer_id,
1106 Arc::new(move |id| {
1107 Ok(new_flush_region_reply_for_region(
1108 id,
1109 RegionId::new(1024, 1),
1110 true,
1111 None,
1112 ))
1113 }),
1114 )),
1115 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1116 ),
1117 Step::next(
1119 "Should be the downgrade leader region",
1120 None,
1121 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1122 ),
1123 Step::next(
1125 "Should be the upgrade candidate region",
1126 Some(mock_datanode_reply(
1127 from_peer_id,
1128 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1129 )),
1130 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1131 ),
1132 Step::next(
1134 "Should be the update metadata for upgrading",
1135 Some(mock_datanode_reply(
1136 to_peer_id,
1137 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1138 )),
1139 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1140 ),
1141 Step::next(
1143 "Should be the close downgraded region",
1144 None,
1145 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1146 ),
1147 Step::next(
1149 "Should be the region migration end",
1150 None,
1151 Assertion::simple(assert_region_migration_end, assert_done),
1152 ),
1153 Step::next(
1155 "Should be the region migration end again",
1156 None,
1157 Assertion::simple(assert_region_migration_end, assert_done),
1158 ),
1159 ]
1160 }
1161
1162 #[tokio::test]
1163 async fn test_procedure_flow() {
1164 common_telemetry::init_default_ut_logging();
1165
1166 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1167 let state = Box::new(RegionMigrationStart);
1168
1169 let from_peer_id = persistent_context.from_peer.id;
1171 let to_peer_id = persistent_context.to_peer.id;
1172 let from_peer = persistent_context.from_peer.clone();
1173 let to_peer = persistent_context.to_peer.clone();
1174 let region_id = persistent_context.region_ids[0];
1175 let table_info = new_test_table_info(1024, vec![1]).into();
1176 let region_routes = vec![RegionRoute {
1177 region: Region::new_test(region_id),
1178 leader_peer: Some(from_peer),
1179 follower_peers: vec![to_peer],
1180 ..Default::default()
1181 }];
1182
1183 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1184 suite.init_table_metadata(table_info, region_routes).await;
1185
1186 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1187 let timer = Instant::now();
1188
1189 let runner = ProcedureMigrationSuiteRunner::new(suite)
1191 .steps(steps)
1192 .run_once()
1193 .await;
1194
1195 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1197
1198 runner.suite.verify_table_metadata().await;
1199 }
1200
1201 #[tokio::test]
1202 async fn test_procedure_flow_open_candidate_region_retryable_error() {
1203 common_telemetry::init_default_ut_logging();
1204
1205 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1206 let state = Box::new(RegionMigrationStart);
1207
1208 let to_peer_id = persistent_context.to_peer.id;
1210 let from_peer = persistent_context.from_peer.clone();
1211 let region_id = persistent_context.region_ids[0];
1212 let table_info = new_test_table_info(1024, vec![1]).into();
1213 let region_routes = vec![RegionRoute {
1214 region: Region::new_test(region_id),
1215 leader_peer: Some(from_peer),
1216 follower_peers: vec![],
1217 ..Default::default()
1218 }];
1219
1220 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1221 suite.init_table_metadata(table_info, region_routes).await;
1222
1223 let steps = vec![
1224 Step::next(
1226 "Should be the open candidate region",
1227 None,
1228 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1229 ),
1230 Step::next(
1232 "Should be throwing a non-retry error",
1233 Some(mock_datanode_reply(
1234 to_peer_id,
1235 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1236 )),
1237 Assertion::error(|error| assert!(error.is_retryable())),
1238 ),
1239 Step::next(
1241 "Should be throwing a non-retry error again",
1242 Some(mock_datanode_reply(
1243 to_peer_id,
1244 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1245 )),
1246 Assertion::error(|error| assert!(error.is_retryable())),
1247 ),
1248 ];
1249
1250 let setup_to_latest_persisted_state = Step::setup(
1251 "Sets state to UpdateMetadata::Downgrade",
1252 merge_before_test_fn(vec![
1253 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1254 Arc::new(reset_volatile_ctx),
1255 ]),
1256 );
1257
1258 let steps = [
1259 steps.clone(),
1260 vec![setup_to_latest_persisted_state.clone()],
1262 steps.clone()[1..].to_vec(),
1263 vec![setup_to_latest_persisted_state],
1264 steps.clone()[1..].to_vec(),
1265 ]
1266 .concat();
1267
1268 let runner = ProcedureMigrationSuiteRunner::new(suite)
1270 .steps(steps.clone())
1271 .run_once()
1272 .await;
1273
1274 let table_routes_version = runner
1275 .env()
1276 .table_metadata_manager()
1277 .table_route_manager()
1278 .table_route_storage()
1279 .get(region_id.table_id())
1280 .await
1281 .unwrap()
1282 .unwrap()
1283 .version();
1284 assert_eq!(table_routes_version.unwrap(), 0);
1286 }
1287
1288 #[tokio::test]
1289 async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1290 common_telemetry::init_default_ut_logging();
1291
1292 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1293 let state = Box::new(RegionMigrationStart);
1294
1295 let from_peer_id = persistent_context.from_peer.id;
1297 let to_peer_id = persistent_context.to_peer.id;
1298 let from_peer = persistent_context.from_peer.clone();
1299 let region_id = persistent_context.region_ids[0];
1300 let table_info = new_test_table_info(1024, vec![1]).into();
1301 let region_routes = vec![RegionRoute {
1302 region: Region::new_test(region_id),
1303 leader_peer: Some(from_peer),
1304 follower_peers: vec![],
1305 ..Default::default()
1306 }];
1307
1308 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1309 suite.init_table_metadata(table_info, region_routes).await;
1310
1311 let steps = vec![
1312 Step::next(
1314 "Should be the open candidate region",
1315 None,
1316 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1317 ),
1318 Step::next(
1320 "Should be the flush leader region",
1321 Some(mock_datanode_reply(
1322 to_peer_id,
1323 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1324 )),
1325 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1326 ),
1327 Step::next(
1329 "Should be the flush leader region",
1330 Some(mock_datanode_reply(
1331 from_peer_id,
1332 Arc::new(move |id| {
1333 Ok(new_flush_region_reply_for_region(
1334 id,
1335 RegionId::new(1024, 1),
1336 true,
1337 None,
1338 ))
1339 }),
1340 )),
1341 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1342 ),
1343 Step::next(
1345 "Should be the downgrade leader region",
1346 None,
1347 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1348 ),
1349 Step::next(
1351 "Should be the upgrade candidate region",
1352 Some(mock_datanode_reply(
1353 from_peer_id,
1354 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1355 )),
1356 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1357 ),
1358 Step::next(
1360 "Should be the rollback metadata",
1361 Some(mock_datanode_reply(
1362 to_peer_id,
1363 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1364 )),
1365 Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1366 ),
1367 Step::next(
1369 "Should be the region migration abort",
1370 None,
1371 Assertion::simple(assert_region_migration_abort, assert_no_persist),
1372 ),
1373 Step::next(
1375 "Should throw an error",
1376 None,
1377 Assertion::error(|error| {
1378 assert!(!error.is_retryable());
1379 assert_matches!(error, error::Error::MigrationAbort { .. });
1380 }),
1381 ),
1382 ];
1383
1384 let setup_to_latest_persisted_state = Step::setup(
1385 "Sets state to OpenCandidateRegion",
1386 merge_before_test_fn(vec![
1387 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1388 Arc::new(reset_volatile_ctx),
1389 ]),
1390 );
1391
1392 let steps = [
1393 steps.clone(),
1394 vec![setup_to_latest_persisted_state.clone()],
1395 steps.clone()[1..].to_vec(),
1396 vec![setup_to_latest_persisted_state],
1397 steps.clone()[1..].to_vec(),
1398 ]
1399 .concat();
1400
1401 ProcedureMigrationSuiteRunner::new(suite)
1403 .steps(steps.clone())
1404 .run_once()
1405 .await;
1406 }
1407
1408 #[tokio::test]
1409 async fn test_procedure_flow_upgrade_candidate_with_retry() {
1410 common_telemetry::init_default_ut_logging();
1411
1412 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1413 let state = Box::new(RegionMigrationStart);
1414
1415 let to_peer_id = persistent_context.to_peer.id;
1417 let from_peer_id = persistent_context.from_peer.id;
1418 let from_peer = persistent_context.from_peer.clone();
1419 let region_id = persistent_context.region_ids[0];
1420 let table_info = new_test_table_info(1024, vec![1]).into();
1421 let region_routes = vec![RegionRoute {
1422 region: Region::new_test(region_id),
1423 leader_peer: Some(from_peer),
1424 follower_peers: vec![],
1425 ..Default::default()
1426 }];
1427
1428 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1429 suite.init_table_metadata(table_info, region_routes).await;
1430
1431 let steps = vec![
1432 Step::next(
1434 "Should be the open candidate region",
1435 None,
1436 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1437 ),
1438 Step::next(
1440 "Should be throwing a retryable error",
1441 Some(mock_datanode_reply(
1442 to_peer_id,
1443 Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1444 )),
1445 Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1446 ),
1447 Step::next(
1449 "Should be the update metadata for downgrading",
1450 Some(mock_datanode_reply(
1451 to_peer_id,
1452 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1453 )),
1454 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1455 ),
1456 Step::next(
1458 "Should be the flush leader region",
1459 Some(mock_datanode_reply(
1460 from_peer_id,
1461 Arc::new(move |id| {
1462 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1463 }),
1464 )),
1465 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1466 ),
1467 Step::next(
1469 "Should be the downgrade leader region",
1470 None,
1471 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1472 ),
1473 Step::next(
1475 "Should be the upgrade candidate region",
1476 Some(mock_datanode_reply(
1477 from_peer_id,
1478 merge_mailbox_messages(vec![
1479 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1480 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1481 ]),
1482 )),
1483 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1484 ),
1485 Step::next(
1487 "Should be the update metadata for upgrading",
1488 Some(mock_datanode_reply(
1489 to_peer_id,
1490 merge_mailbox_messages(vec![
1491 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1492 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1493 ]),
1494 )),
1495 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1496 ),
1497 Step::next(
1499 "Should be the close downgraded region",
1500 None,
1501 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1502 ),
1503 Step::next(
1505 "Should be the region migration end",
1506 None,
1507 Assertion::simple(assert_region_migration_end, assert_done),
1508 ),
1509 Step::next(
1511 "Should be the region migration end again",
1512 None,
1513 Assertion::simple(assert_region_migration_end, assert_done),
1514 ),
1515 Step::setup(
1517 "Sets state to RegionMigrationStart",
1518 merge_before_test_fn(vec![
1519 setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1520 Arc::new(reset_volatile_ctx),
1521 ]),
1522 ),
1523 Step::next(
1527 "Should be the region migration end(has been migrated)",
1528 None,
1529 Assertion::simple(assert_region_migration_end, assert_done),
1530 ),
1531 ];
1532
1533 let steps = [steps.clone()].concat();
1534 let timer = Instant::now();
1535
1536 let runner = ProcedureMigrationSuiteRunner::new(suite)
1538 .steps(steps.clone())
1539 .run_once()
1540 .await;
1541
1542 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1544 runner.suite.verify_table_metadata().await;
1545 }
1546}