meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::fmt::{Debug, Display};
30use std::time::Duration;
31
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::CacheInvalidatorRef;
34use common_meta::ddl::RegionFailureDetectorControllerRef;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
37use common_meta::key::table_info::TableInfoValue;
38use common_meta::key::table_route::TableRouteValue;
39use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
40use common_meta::kv_backend::ResettableKvBackendRef;
41use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
42use common_meta::peer::Peer;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_procedure::error::{
45    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
46};
47use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
48use common_telemetry::{error, info};
49use manager::RegionMigrationProcedureGuard;
50pub use manager::{
51    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
52};
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::RegionId;
56use tokio::time::Instant;
57
58use self::migration_start::RegionMigrationStart;
59use crate::error::{self, Result};
60use crate::metrics::{
61    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
62    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
63};
64use crate::service::mailbox::MailboxRef;
65
66/// The default timeout for region migration.
67pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
68
69/// It's shared in each step and available even after recovering.
70///
71/// It will only be updated/stored after the Red node has succeeded.
72///
73/// **Notes: Stores with too large data in the context might incur replication overhead.**
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct PersistentContext {
76    /// The table catalog.
77    catalog: String,
78    /// The table schema.
79    schema: String,
80    /// The [Peer] of migration source.
81    from_peer: Peer,
82    /// The [Peer] of migration destination.
83    to_peer: Peer,
84    /// The [RegionId] of migration region.
85    region_id: RegionId,
86    /// The timeout for downgrading leader region and upgrading candidate region operations.
87    #[serde(with = "humantime_serde", default = "default_timeout")]
88    timeout: Duration,
89}
90
91fn default_timeout() -> Duration {
92    Duration::from_secs(10)
93}
94
95impl PersistentContext {
96    pub fn lock_key(&self) -> Vec<StringKey> {
97        let region_id = self.region_id;
98        let lock_key = vec![
99            CatalogLock::Read(&self.catalog).into(),
100            SchemaLock::read(&self.catalog, &self.schema).into(),
101            RegionLock::Write(region_id).into(),
102        ];
103
104        lock_key
105    }
106}
107
108/// Metrics of region migration.
109#[derive(Debug, Clone, Default)]
110pub struct Metrics {
111    /// Elapsed time of downgrading region and upgrading region.
112    operations_elapsed: Duration,
113    /// Elapsed time of flushing leader region.
114    flush_leader_region_elapsed: Duration,
115    /// Elapsed time of downgrading leader region.
116    downgrade_leader_region_elapsed: Duration,
117    /// Elapsed time of open candidate region.
118    open_candidate_region_elapsed: Duration,
119    /// Elapsed time of upgrade candidate region.
120    upgrade_candidate_region_elapsed: Duration,
121}
122
123impl Display for Metrics {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        let total = self.flush_leader_region_elapsed
126            + self.downgrade_leader_region_elapsed
127            + self.open_candidate_region_elapsed
128            + self.upgrade_candidate_region_elapsed;
129        write!(
130            f,
131            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
132            total,
133            self.flush_leader_region_elapsed,
134            self.downgrade_leader_region_elapsed,
135            self.open_candidate_region_elapsed,
136            self.upgrade_candidate_region_elapsed
137        )
138    }
139}
140
141impl Metrics {
142    /// Updates the elapsed time of downgrading region and upgrading region.
143    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
144        self.operations_elapsed += elapsed;
145    }
146
147    /// Updates the elapsed time of flushing leader region.
148    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
149        self.flush_leader_region_elapsed += elapsed;
150    }
151
152    /// Updates the elapsed time of downgrading leader region.
153    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
154        self.downgrade_leader_region_elapsed += elapsed;
155    }
156
157    /// Updates the elapsed time of open candidate region.
158    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
159        self.open_candidate_region_elapsed += elapsed;
160    }
161
162    /// Updates the elapsed time of upgrade candidate region.
163    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
164        self.upgrade_candidate_region_elapsed += elapsed;
165    }
166}
167
168impl Drop for Metrics {
169    fn drop(&mut self) {
170        let total = self.flush_leader_region_elapsed
171            + self.downgrade_leader_region_elapsed
172            + self.open_candidate_region_elapsed
173            + self.upgrade_candidate_region_elapsed;
174        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
175            .with_label_values(&["total"])
176            .observe(total.as_secs_f64());
177
178        if !self.flush_leader_region_elapsed.is_zero() {
179            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
180                .with_label_values(&["flush_leader_region"])
181                .observe(self.flush_leader_region_elapsed.as_secs_f64());
182        }
183
184        if !self.downgrade_leader_region_elapsed.is_zero() {
185            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
186                .with_label_values(&["downgrade_leader_region"])
187                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
188        }
189
190        if !self.open_candidate_region_elapsed.is_zero() {
191            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
192                .with_label_values(&["open_candidate_region"])
193                .observe(self.open_candidate_region_elapsed.as_secs_f64());
194        }
195
196        if !self.upgrade_candidate_region_elapsed.is_zero() {
197            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
198                .with_label_values(&["upgrade_candidate_region"])
199                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
200        }
201    }
202}
203
204/// It's shared in each step and available in executing (including retrying).
205///
206/// It will be dropped if the procedure runner crashes.
207///
208/// The additional remote fetches are only required in the worst cases.
209#[derive(Debug, Clone, Default)]
210pub struct VolatileContext {
211    /// `opening_region_guard` will be set after the
212    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
213    ///
214    /// `opening_region_guard` should be consumed after
215    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
216    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
217    opening_region_guard: Option<OperatingRegionGuard>,
218    /// `table_route` is stored via previous steps for future use.
219    table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
220    /// `datanode_table` is stored via previous steps for future use.
221    from_peer_datanode_table: Option<DatanodeTableValue>,
222    /// `table_info` is stored via previous steps for future use.
223    ///
224    /// `table_info` should remain unchanged during the procedure;
225    /// no other DDL procedure executed concurrently for the current table.
226    table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
227    /// The deadline of leader region lease.
228    leader_region_lease_deadline: Option<Instant>,
229    /// The last_entry_id of leader region.
230    leader_region_last_entry_id: Option<u64>,
231    /// The last_entry_id of leader metadata region (Only used for metric engine).
232    leader_region_metadata_last_entry_id: Option<u64>,
233    /// Metrics of region migration.
234    metrics: Metrics,
235}
236
237impl VolatileContext {
238    /// Sets the `leader_region_lease_deadline` if it does not exist.
239    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
240        if self.leader_region_lease_deadline.is_none() {
241            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
242        }
243    }
244
245    /// Resets the `leader_region_lease_deadline`.
246    pub fn reset_leader_region_lease_deadline(&mut self) {
247        self.leader_region_lease_deadline = None;
248    }
249
250    /// Sets the `leader_region_last_entry_id`.
251    pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
252        self.leader_region_last_entry_id = Some(last_entry_id)
253    }
254
255    /// Sets the `leader_region_metadata_last_entry_id`.
256    pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
257        self.leader_region_metadata_last_entry_id = Some(last_entry_id);
258    }
259}
260
261/// Used to generate new [Context].
262pub trait ContextFactory {
263    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
264}
265
266/// Default implementation.
267#[derive(Clone)]
268pub struct DefaultContextFactory {
269    volatile_ctx: VolatileContext,
270    in_memory_key: ResettableKvBackendRef,
271    table_metadata_manager: TableMetadataManagerRef,
272    opening_region_keeper: MemoryRegionKeeperRef,
273    region_failure_detector_controller: RegionFailureDetectorControllerRef,
274    mailbox: MailboxRef,
275    server_addr: String,
276    cache_invalidator: CacheInvalidatorRef,
277}
278
279impl DefaultContextFactory {
280    /// Returns an [`DefaultContextFactory`].
281    pub fn new(
282        in_memory_key: ResettableKvBackendRef,
283        table_metadata_manager: TableMetadataManagerRef,
284        opening_region_keeper: MemoryRegionKeeperRef,
285        region_failure_detector_controller: RegionFailureDetectorControllerRef,
286        mailbox: MailboxRef,
287        server_addr: String,
288        cache_invalidator: CacheInvalidatorRef,
289    ) -> Self {
290        Self {
291            volatile_ctx: VolatileContext::default(),
292            in_memory_key,
293            table_metadata_manager,
294            opening_region_keeper,
295            region_failure_detector_controller,
296            mailbox,
297            server_addr,
298            cache_invalidator,
299        }
300    }
301}
302
303impl ContextFactory for DefaultContextFactory {
304    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
305        Context {
306            persistent_ctx,
307            volatile_ctx: self.volatile_ctx,
308            in_memory: self.in_memory_key,
309            table_metadata_manager: self.table_metadata_manager,
310            opening_region_keeper: self.opening_region_keeper,
311            region_failure_detector_controller: self.region_failure_detector_controller,
312            mailbox: self.mailbox,
313            server_addr: self.server_addr,
314            cache_invalidator: self.cache_invalidator,
315        }
316    }
317}
318
319/// The context of procedure execution.
320pub struct Context {
321    persistent_ctx: PersistentContext,
322    volatile_ctx: VolatileContext,
323    in_memory: ResettableKvBackendRef,
324    table_metadata_manager: TableMetadataManagerRef,
325    opening_region_keeper: MemoryRegionKeeperRef,
326    region_failure_detector_controller: RegionFailureDetectorControllerRef,
327    mailbox: MailboxRef,
328    server_addr: String,
329    cache_invalidator: CacheInvalidatorRef,
330}
331
332impl Context {
333    /// Returns the next operation's timeout.
334    pub fn next_operation_timeout(&self) -> Option<Duration> {
335        self.persistent_ctx
336            .timeout
337            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
338    }
339
340    /// Updates operations elapsed.
341    pub fn update_operations_elapsed(&mut self, instant: Instant) {
342        self.volatile_ctx
343            .metrics
344            .update_operations_elapsed(instant.elapsed());
345    }
346
347    /// Updates the elapsed time of flushing leader region.
348    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
349        self.volatile_ctx
350            .metrics
351            .update_flush_leader_region_elapsed(instant.elapsed());
352    }
353
354    /// Updates the elapsed time of downgrading leader region.
355    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
356        self.volatile_ctx
357            .metrics
358            .update_downgrade_leader_region_elapsed(instant.elapsed());
359    }
360
361    /// Updates the elapsed time of open candidate region.
362    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
363        self.volatile_ctx
364            .metrics
365            .update_open_candidate_region_elapsed(instant.elapsed());
366    }
367
368    /// Updates the elapsed time of upgrade candidate region.
369    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
370        self.volatile_ctx
371            .metrics
372            .update_upgrade_candidate_region_elapsed(instant.elapsed());
373    }
374
375    /// Returns address of meta server.
376    pub fn server_addr(&self) -> &str {
377        &self.server_addr
378    }
379
380    /// Returns the `table_route` of [VolatileContext] if any.
381    /// Otherwise, returns the value retrieved from remote.
382    ///
383    /// Retry:
384    /// - Failed to retrieve the metadata of table.
385    pub async fn get_table_route_value(
386        &mut self,
387    ) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
388        let table_route_value = &mut self.volatile_ctx.table_route;
389
390        if table_route_value.is_none() {
391            let table_id = self.persistent_ctx.region_id.table_id();
392            let table_route = self
393                .table_metadata_manager
394                .table_route_manager()
395                .table_route_storage()
396                .get_with_raw_bytes(table_id)
397                .await
398                .context(error::TableMetadataManagerSnafu)
399                .map_err(BoxedError::new)
400                .with_context(|_| error::RetryLaterWithSourceSnafu {
401                    reason: format!("Failed to get TableRoute: {table_id}"),
402                })?
403                .context(error::TableRouteNotFoundSnafu { table_id })?;
404
405            *table_route_value = Some(table_route);
406        }
407
408        Ok(table_route_value.as_ref().unwrap())
409    }
410
411    /// Notifies the RegionSupervisor to register failure detectors of failed region.
412    ///
413    /// The original failure detector was removed once the procedure was triggered.
414    /// Now, we need to register the failure detector for the failed region again.
415    pub async fn register_failure_detectors(&self) {
416        let datanode_id = self.persistent_ctx.from_peer.id;
417        let region_id = self.persistent_ctx.region_id;
418
419        self.region_failure_detector_controller
420            .register_failure_detectors(vec![(datanode_id, region_id)])
421            .await;
422    }
423
424    /// Notifies the RegionSupervisor to deregister failure detectors.
425    ///
426    /// The original failure detectors won't be removed once the procedure was triggered.
427    /// We need to deregister the failure detectors for the original region if the procedure is finished.
428    pub async fn deregister_failure_detectors(&self) {
429        let datanode_id = self.persistent_ctx.from_peer.id;
430        let region_id = self.persistent_ctx.region_id;
431
432        self.region_failure_detector_controller
433            .deregister_failure_detectors(vec![(datanode_id, region_id)])
434            .await;
435    }
436
437    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
438    ///
439    /// The candidate region may be created on the destination peer,
440    /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
441    pub async fn deregister_failure_detectors_for_candidate_region(&self) {
442        let to_peer_id = self.persistent_ctx.to_peer.id;
443        let region_id = self.persistent_ctx.region_id;
444
445        self.region_failure_detector_controller
446            .deregister_failure_detectors(vec![(to_peer_id, region_id)])
447            .await;
448    }
449
450    /// Removes the `table_route` of [VolatileContext], returns true if any.
451    pub fn remove_table_route_value(&mut self) -> bool {
452        let value = self.volatile_ctx.table_route.take();
453        value.is_some()
454    }
455
456    /// Returns the `table_info` of [VolatileContext] if any.
457    /// Otherwise, returns the value retrieved from remote.
458    ///
459    /// Retry:
460    /// - Failed to retrieve the metadata of table.
461    pub async fn get_table_info_value(
462        &mut self,
463    ) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
464        let table_info_value = &mut self.volatile_ctx.table_info;
465
466        if table_info_value.is_none() {
467            let table_id = self.persistent_ctx.region_id.table_id();
468            let table_info = self
469                .table_metadata_manager
470                .table_info_manager()
471                .get(table_id)
472                .await
473                .context(error::TableMetadataManagerSnafu)
474                .map_err(BoxedError::new)
475                .with_context(|_| error::RetryLaterWithSourceSnafu {
476                    reason: format!("Failed to get TableInfo: {table_id}"),
477                })?
478                .context(error::TableInfoNotFoundSnafu { table_id })?;
479
480            *table_info_value = Some(table_info);
481        }
482
483        Ok(table_info_value.as_ref().unwrap())
484    }
485
486    /// Returns the `table_info` of [VolatileContext] if any.
487    /// Otherwise, returns the value retrieved from remote.
488    ///
489    /// Retry:
490    /// - Failed to retrieve the metadata of datanode.
491    pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> {
492        let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table;
493
494        if datanode_value.is_none() {
495            let table_id = self.persistent_ctx.region_id.table_id();
496            let datanode_id = self.persistent_ctx.from_peer.id;
497
498            let datanode_table = self
499                .table_metadata_manager
500                .datanode_table_manager()
501                .get(&DatanodeTableKey {
502                    datanode_id,
503                    table_id,
504                })
505                .await
506                .context(error::TableMetadataManagerSnafu)
507                .map_err(BoxedError::new)
508                .with_context(|_| error::RetryLaterWithSourceSnafu {
509                    reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
510                })?
511                .context(error::DatanodeTableNotFoundSnafu {
512                    table_id,
513                    datanode_id,
514                })?;
515
516            *datanode_value = Some(datanode_table);
517        }
518
519        Ok(datanode_value.as_ref().unwrap())
520    }
521
522    /// Returns the [RegionId].
523    pub fn region_id(&self) -> RegionId {
524        self.persistent_ctx.region_id
525    }
526
527    /// Broadcasts the invalidate table cache message.
528    pub async fn invalidate_table_cache(&self) -> Result<()> {
529        let table_id = self.region_id().table_id();
530        // ignore the result
531        let ctx = common_meta::cache_invalidator::Context::default();
532        let _ = self
533            .cache_invalidator
534            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
535            .await;
536        Ok(())
537    }
538}
539
540#[async_trait::async_trait]
541#[typetag::serde(tag = "region_migration_state")]
542pub(crate) trait State: Sync + Send + Debug {
543    fn name(&self) -> &'static str {
544        let type_name = std::any::type_name::<Self>();
545        // short name
546        type_name.split("::").last().unwrap_or(type_name)
547    }
548
549    /// Yields the next [State] and [Status].
550    async fn next(
551        &mut self,
552        ctx: &mut Context,
553        procedure_ctx: &ProcedureContext,
554    ) -> Result<(Box<dyn State>, Status)>;
555
556    /// Returns as [Any](std::any::Any).
557    fn as_any(&self) -> &dyn Any;
558}
559
560/// Persistent data of [RegionMigrationProcedure].
561#[derive(Debug, Serialize, Deserialize)]
562pub struct RegionMigrationDataOwned {
563    persistent_ctx: PersistentContext,
564    state: Box<dyn State>,
565}
566
567/// Persistent data of [RegionMigrationProcedure].
568#[derive(Debug, Serialize)]
569pub struct RegionMigrationData<'a> {
570    persistent_ctx: &'a PersistentContext,
571    state: &'a dyn State,
572}
573
574pub(crate) struct RegionMigrationProcedure {
575    state: Box<dyn State>,
576    context: Context,
577    _guard: Option<RegionMigrationProcedureGuard>,
578}
579
580impl RegionMigrationProcedure {
581    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
582
583    pub fn new(
584        persistent_context: PersistentContext,
585        context_factory: impl ContextFactory,
586        guard: Option<RegionMigrationProcedureGuard>,
587    ) -> Self {
588        let state = Box::new(RegionMigrationStart {});
589        Self::new_inner(state, persistent_context, context_factory, guard)
590    }
591
592    fn new_inner(
593        state: Box<dyn State>,
594        persistent_context: PersistentContext,
595        context_factory: impl ContextFactory,
596        guard: Option<RegionMigrationProcedureGuard>,
597    ) -> Self {
598        Self {
599            state,
600            context: context_factory.new_context(persistent_context),
601            _guard: guard,
602        }
603    }
604
605    fn from_json(
606        json: &str,
607        context_factory: impl ContextFactory,
608        tracker: RegionMigrationProcedureTracker,
609    ) -> ProcedureResult<Self> {
610        let RegionMigrationDataOwned {
611            persistent_ctx,
612            state,
613        } = serde_json::from_str(json).context(FromJsonSnafu)?;
614
615        let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
616            region_id: persistent_ctx.region_id,
617            from_peer: persistent_ctx.from_peer.clone(),
618            to_peer: persistent_ctx.to_peer.clone(),
619            timeout: persistent_ctx.timeout,
620        });
621        let context = context_factory.new_context(persistent_ctx);
622
623        Ok(Self {
624            state,
625            context,
626            _guard: guard,
627        })
628    }
629
630    async fn rollback_inner(&mut self) -> Result<()> {
631        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
632            .with_label_values(&["rollback"])
633            .start_timer();
634
635        let table_id = self.context.region_id().table_id();
636        let region_id = self.context.region_id();
637        self.context.remove_table_route_value();
638        let table_metadata_manager = self.context.table_metadata_manager.clone();
639        let table_route = self.context.get_table_route_value().await?;
640
641        // Safety: It must be a physical table route.
642        let downgraded = table_route
643            .region_routes()
644            .unwrap()
645            .iter()
646            .filter(|route| route.region.id == region_id)
647            .any(|route| route.is_leader_downgrading());
648
649        if downgraded {
650            info!("Rollbacking downgraded region leader table route, region: {region_id}");
651            table_metadata_manager
652                    .update_leader_region_status(table_id, table_route, |route| {
653                        if route.region.id == region_id {
654                            Some(None)
655                        } else {
656                            None
657                        }
658                    })
659                    .await
660                    .context(error::TableMetadataManagerSnafu)
661                    .map_err(BoxedError::new)
662                    .with_context(|_| error::RetryLaterWithSourceSnafu {
663                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
664                    })?;
665        }
666
667        self.context.register_failure_detectors().await;
668
669        Ok(())
670    }
671}
672
673#[async_trait::async_trait]
674impl Procedure for RegionMigrationProcedure {
675    fn type_name(&self) -> &str {
676        Self::TYPE_NAME
677    }
678
679    async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
680        self.rollback_inner()
681            .await
682            .map_err(ProcedureError::external)
683    }
684
685    fn rollback_supported(&self) -> bool {
686        true
687    }
688
689    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
690        let state = &mut self.state;
691
692        let name = state.name();
693        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
694            .with_label_values(&[name])
695            .start_timer();
696        match state.next(&mut self.context, ctx).await {
697            Ok((next, status)) => {
698                *state = next;
699                Ok(status)
700            }
701            Err(e) => {
702                if e.is_retryable() {
703                    METRIC_META_REGION_MIGRATION_ERROR
704                        .with_label_values(&[name, "retryable"])
705                        .inc();
706                    Err(ProcedureError::retry_later(e))
707                } else {
708                    // Consumes the opening region guard before deregistering the failure detectors.
709                    self.context.volatile_ctx.opening_region_guard.take();
710                    self.context
711                        .deregister_failure_detectors_for_candidate_region()
712                        .await;
713                    error!(
714                        e;
715                        "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
716                        self.context.region_id(),
717                        self.context.persistent_ctx.from_peer,
718                        self.context.persistent_ctx.to_peer,
719                        self.context.volatile_ctx.metrics,
720                    );
721                    METRIC_META_REGION_MIGRATION_ERROR
722                        .with_label_values(&[name, "external"])
723                        .inc();
724                    Err(ProcedureError::external(e))
725                }
726            }
727        }
728    }
729
730    fn dump(&self) -> ProcedureResult<String> {
731        let data = RegionMigrationData {
732            state: self.state.as_ref(),
733            persistent_ctx: &self.context.persistent_ctx,
734        };
735        serde_json::to_string(&data).context(ToJsonSnafu)
736    }
737
738    fn lock_key(&self) -> LockKey {
739        LockKey::new(self.context.persistent_ctx.lock_key())
740    }
741}
742
743#[cfg(test)]
744mod tests {
745    use std::assert_matches::assert_matches;
746    use std::sync::Arc;
747
748    use common_meta::distributed_time_constants::REGION_LEASE_SECS;
749    use common_meta::instruction::Instruction;
750    use common_meta::key::test_utils::new_test_table_info;
751    use common_meta::rpc::router::{Region, RegionRoute};
752
753    use super::update_metadata::UpdateMetadata;
754    use super::*;
755    use crate::handler::HeartbeatMailbox;
756    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
757    use crate::procedure::region_migration::test_util::*;
758    use crate::procedure::test_util::{
759        new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
760        new_upgrade_region_reply,
761    };
762    use crate::service::mailbox::Channel;
763
764    fn new_persistent_context() -> PersistentContext {
765        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
766    }
767
768    #[test]
769    fn test_lock_key() {
770        let persistent_context = new_persistent_context();
771        let expected_keys = persistent_context.lock_key();
772
773        let env = TestingEnv::new();
774        let context = env.context_factory();
775
776        let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
777
778        let key = procedure.lock_key();
779        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
780
781        for key in expected_keys {
782            assert!(keys.contains(&key));
783        }
784    }
785
786    #[test]
787    fn test_data_serialization() {
788        let persistent_context = new_persistent_context();
789
790        let env = TestingEnv::new();
791        let context = env.context_factory();
792
793        let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
794
795        let serialized = procedure.dump().unwrap();
796        let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
797        assert_eq!(expected, serialized);
798    }
799
800    #[test]
801    fn test_backward_compatibility() {
802        let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
803        // NOTES: Changes it will break backward compatibility.
804        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
805        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
806
807        assert_eq!(persistent_ctx, deserialized);
808    }
809
810    #[derive(Debug, Serialize, Deserialize, Default)]
811    pub struct MockState;
812
813    #[async_trait::async_trait]
814    #[typetag::serde]
815    impl State for MockState {
816        async fn next(
817            &mut self,
818            _ctx: &mut Context,
819            _procedure_ctx: &ProcedureContext,
820        ) -> Result<(Box<dyn State>, Status)> {
821            Ok((Box::new(MockState), Status::done()))
822        }
823
824        fn as_any(&self) -> &dyn Any {
825            self
826        }
827    }
828
829    #[tokio::test]
830    async fn test_execution_after_deserialized() {
831        let env = TestingEnv::new();
832
833        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
834            let persistent_context = new_persistent_context();
835            let context_factory = env.context_factory();
836            let state = Box::<MockState>::default();
837            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
838        }
839
840        let ctx = TestingEnv::procedure_context();
841        let mut procedure = new_mock_procedure(&env);
842        let mut status = None;
843        for _ in 0..3 {
844            status = Some(procedure.execute(&ctx).await.unwrap());
845        }
846        assert!(status.unwrap().is_done());
847
848        let ctx = TestingEnv::procedure_context();
849        let mut procedure = new_mock_procedure(&env);
850
851        status = Some(procedure.execute(&ctx).await.unwrap());
852
853        let serialized = procedure.dump().unwrap();
854
855        let context_factory = env.context_factory();
856        let tracker = env.tracker();
857        let mut procedure =
858            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
859                .unwrap();
860        assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
861
862        for _ in 1..3 {
863            status = Some(procedure.execute(&ctx).await.unwrap());
864        }
865        assert!(status.unwrap().is_done());
866    }
867
868    #[tokio::test]
869    async fn test_broadcast_invalidate_table_cache() {
870        let mut env = TestingEnv::new();
871        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
872        let ctx = env.context_factory().new_context(persistent_context);
873        let mailbox_ctx = env.mailbox_context();
874
875        // No receivers.
876        ctx.invalidate_table_cache().await.unwrap();
877
878        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
879
880        mailbox_ctx
881            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
882            .await;
883
884        ctx.invalidate_table_cache().await.unwrap();
885
886        let resp = rx.recv().await.unwrap().unwrap();
887        let msg = resp.mailbox_message.unwrap();
888
889        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
890        assert_eq!(
891            instruction,
892            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
893        );
894    }
895
896    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
897        vec![
898            // MigrationStart
899            Step::next(
900                "Should be the update metadata for downgrading",
901                None,
902                Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
903            ),
904            // UpdateMetadata::Downgrade
905            Step::next(
906                "Should be the downgrade leader region",
907                None,
908                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
909            ),
910            // Downgrade Candidate
911            Step::next(
912                "Should be the upgrade candidate region",
913                Some(mock_datanode_reply(
914                    from_peer_id,
915                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
916                )),
917                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
918            ),
919            // Upgrade Candidate
920            Step::next(
921                "Should be the update metadata for upgrading",
922                Some(mock_datanode_reply(
923                    to_peer_id,
924                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
925                )),
926                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
927            ),
928            // UpdateMetadata::Upgrade
929            Step::next(
930                "Should be the close downgraded region",
931                None,
932                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
933            ),
934            // CloseDowngradedRegion
935            Step::next(
936                "Should be the region migration end",
937                None,
938                Assertion::simple(assert_region_migration_end, assert_done),
939            ),
940            // RegionMigrationEnd
941            Step::next(
942                "Should be the region migration end again",
943                None,
944                Assertion::simple(assert_region_migration_end, assert_done),
945            ),
946        ]
947    }
948
949    #[tokio::test]
950    async fn test_procedure_flow() {
951        common_telemetry::init_default_ut_logging();
952
953        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
954        let state = Box::new(RegionMigrationStart);
955
956        // The table metadata.
957        let from_peer_id = persistent_context.from_peer.id;
958        let to_peer_id = persistent_context.to_peer.id;
959        let from_peer = persistent_context.from_peer.clone();
960        let to_peer = persistent_context.to_peer.clone();
961        let region_id = persistent_context.region_id;
962        let table_info = new_test_table_info(1024, vec![1]).into();
963        let region_routes = vec![RegionRoute {
964            region: Region::new_test(region_id),
965            leader_peer: Some(from_peer),
966            follower_peers: vec![to_peer],
967            ..Default::default()
968        }];
969
970        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
971        suite.init_table_metadata(table_info, region_routes).await;
972
973        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
974        let timer = Instant::now();
975
976        // Run the table tests.
977        let runner = ProcedureMigrationSuiteRunner::new(suite)
978            .steps(steps)
979            .run_once()
980            .await;
981
982        // Ensure it didn't run into the slow path.
983        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
984
985        runner.suite.verify_table_metadata().await;
986    }
987
988    #[tokio::test]
989    async fn test_procedure_flow_idempotent() {
990        common_telemetry::init_default_ut_logging();
991
992        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
993        let state = Box::new(RegionMigrationStart);
994
995        // The table metadata.
996        let from_peer_id = persistent_context.from_peer.id;
997        let to_peer_id = persistent_context.to_peer.id;
998        let from_peer = persistent_context.from_peer.clone();
999        let to_peer = persistent_context.to_peer.clone();
1000        let region_id = persistent_context.region_id;
1001        let table_info = new_test_table_info(1024, vec![1]).into();
1002        let region_routes = vec![RegionRoute {
1003            region: Region::new_test(region_id),
1004            leader_peer: Some(from_peer),
1005            follower_peers: vec![to_peer],
1006            ..Default::default()
1007        }];
1008
1009        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1010        suite.init_table_metadata(table_info, region_routes).await;
1011
1012        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1013        let setup_to_latest_persisted_state = Step::setup(
1014            "Sets state to UpdateMetadata::Downgrade",
1015            merge_before_test_fn(vec![
1016                setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1017                Arc::new(reset_volatile_ctx),
1018            ]),
1019        );
1020
1021        let steps = [
1022            steps.clone(),
1023            vec![setup_to_latest_persisted_state.clone()],
1024            steps.clone()[1..].to_vec(),
1025            vec![setup_to_latest_persisted_state],
1026            steps.clone()[1..].to_vec(),
1027        ]
1028        .concat();
1029        let timer = Instant::now();
1030
1031        // Run the table tests.
1032        let runner = ProcedureMigrationSuiteRunner::new(suite)
1033            .steps(steps.clone())
1034            .run_once()
1035            .await;
1036
1037        // Ensure it didn't run into the slow path.
1038        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1039
1040        runner.suite.verify_table_metadata().await;
1041    }
1042
1043    #[tokio::test]
1044    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1045        common_telemetry::init_default_ut_logging();
1046
1047        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1048        let state = Box::new(RegionMigrationStart);
1049
1050        // The table metadata.
1051        let to_peer_id = persistent_context.to_peer.id;
1052        let from_peer = persistent_context.from_peer.clone();
1053        let region_id = persistent_context.region_id;
1054        let table_info = new_test_table_info(1024, vec![1]).into();
1055        let region_routes = vec![RegionRoute {
1056            region: Region::new_test(region_id),
1057            leader_peer: Some(from_peer),
1058            follower_peers: vec![],
1059            ..Default::default()
1060        }];
1061
1062        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1063        suite.init_table_metadata(table_info, region_routes).await;
1064
1065        let steps = vec![
1066            // Migration Start
1067            Step::next(
1068                "Should be the open candidate region",
1069                None,
1070                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1071            ),
1072            // OpenCandidateRegion
1073            Step::next(
1074                "Should be throwing a non-retry error",
1075                Some(mock_datanode_reply(
1076                    to_peer_id,
1077                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1078                )),
1079                Assertion::error(|error| assert!(error.is_retryable())),
1080            ),
1081            // OpenCandidateRegion
1082            Step::next(
1083                "Should be throwing a non-retry error again",
1084                Some(mock_datanode_reply(
1085                    to_peer_id,
1086                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1087                )),
1088                Assertion::error(|error| assert!(error.is_retryable())),
1089            ),
1090        ];
1091
1092        let setup_to_latest_persisted_state = Step::setup(
1093            "Sets state to UpdateMetadata::Downgrade",
1094            merge_before_test_fn(vec![
1095                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1096                Arc::new(reset_volatile_ctx),
1097            ]),
1098        );
1099
1100        let steps = [
1101            steps.clone(),
1102            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1103            vec![setup_to_latest_persisted_state.clone()],
1104            steps.clone()[1..].to_vec(),
1105            vec![setup_to_latest_persisted_state],
1106            steps.clone()[1..].to_vec(),
1107        ]
1108        .concat();
1109
1110        // Run the table tests.
1111        let runner = ProcedureMigrationSuiteRunner::new(suite)
1112            .steps(steps.clone())
1113            .run_once()
1114            .await;
1115
1116        let table_routes_version = runner
1117            .env()
1118            .table_metadata_manager()
1119            .table_route_manager()
1120            .table_route_storage()
1121            .get(region_id.table_id())
1122            .await
1123            .unwrap()
1124            .unwrap()
1125            .version();
1126        // Should be unchanged.
1127        assert_eq!(table_routes_version.unwrap(), 0);
1128    }
1129
1130    #[tokio::test]
1131    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1132        common_telemetry::init_default_ut_logging();
1133
1134        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1135        let state = Box::new(RegionMigrationStart);
1136
1137        // The table metadata.
1138        let from_peer_id = persistent_context.from_peer.id;
1139        let to_peer_id = persistent_context.to_peer.id;
1140        let from_peer = persistent_context.from_peer.clone();
1141        let to_peer = persistent_context.to_peer.clone();
1142        let region_id = persistent_context.region_id;
1143        let table_info = new_test_table_info(1024, vec![1]).into();
1144        let region_routes = vec![RegionRoute {
1145            region: Region::new_test(region_id),
1146            leader_peer: Some(from_peer),
1147            follower_peers: vec![to_peer],
1148            ..Default::default()
1149        }];
1150
1151        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1152        suite.init_table_metadata(table_info, region_routes).await;
1153
1154        let steps = vec![
1155            // MigrationStart
1156            Step::next(
1157                "Should be the update metadata for downgrading",
1158                None,
1159                Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
1160            ),
1161            // UpdateMetadata::Downgrade
1162            Step::next(
1163                "Should be the downgrade leader region",
1164                None,
1165                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1166            ),
1167            // Downgrade Candidate
1168            Step::next(
1169                "Should be the upgrade candidate region",
1170                Some(mock_datanode_reply(
1171                    from_peer_id,
1172                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1173                )),
1174                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1175            ),
1176            // Upgrade Candidate
1177            Step::next(
1178                "Should be the rollback metadata",
1179                Some(mock_datanode_reply(
1180                    to_peer_id,
1181                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1182                )),
1183                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1184            ),
1185            // UpdateMetadata::Rollback
1186            Step::next(
1187                "Should be the region migration abort",
1188                None,
1189                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1190            ),
1191            // RegionMigrationAbort
1192            Step::next(
1193                "Should throw an error",
1194                None,
1195                Assertion::error(|error| {
1196                    assert!(!error.is_retryable());
1197                    assert_matches!(error, error::Error::MigrationAbort { .. });
1198                }),
1199            ),
1200        ];
1201
1202        let setup_to_latest_persisted_state = Step::setup(
1203            "Sets state to UpdateMetadata::Downgrade",
1204            merge_before_test_fn(vec![
1205                setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1206                Arc::new(reset_volatile_ctx),
1207            ]),
1208        );
1209
1210        let steps = [
1211            steps.clone(),
1212            vec![setup_to_latest_persisted_state.clone()],
1213            steps.clone()[1..].to_vec(),
1214            vec![setup_to_latest_persisted_state],
1215            steps.clone()[1..].to_vec(),
1216        ]
1217        .concat();
1218
1219        // Run the table tests.
1220        ProcedureMigrationSuiteRunner::new(suite)
1221            .steps(steps.clone())
1222            .run_once()
1223            .await;
1224    }
1225
1226    #[tokio::test]
1227    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1228        common_telemetry::init_default_ut_logging();
1229
1230        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1231        let state = Box::new(RegionMigrationStart);
1232
1233        // The table metadata.
1234        let to_peer_id = persistent_context.to_peer.id;
1235        let from_peer_id = persistent_context.from_peer.id;
1236        let from_peer = persistent_context.from_peer.clone();
1237        let region_id = persistent_context.region_id;
1238        let table_info = new_test_table_info(1024, vec![1]).into();
1239        let region_routes = vec![RegionRoute {
1240            region: Region::new_test(region_id),
1241            leader_peer: Some(from_peer),
1242            follower_peers: vec![],
1243            ..Default::default()
1244        }];
1245
1246        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1247        suite.init_table_metadata(table_info, region_routes).await;
1248
1249        let steps = vec![
1250            // Migration Start
1251            Step::next(
1252                "Should be the open candidate region",
1253                None,
1254                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1255            ),
1256            // OpenCandidateRegion
1257            Step::next(
1258                "Should be throwing a retryable error",
1259                Some(mock_datanode_reply(
1260                    to_peer_id,
1261                    Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1262                )),
1263                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1264            ),
1265            // OpenCandidateRegion
1266            Step::next(
1267                "Should be the update metadata for downgrading",
1268                Some(mock_datanode_reply(
1269                    to_peer_id,
1270                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1271                )),
1272                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1273            ),
1274            // Flush Leader Region
1275            Step::next(
1276                "Should be the flush leader region",
1277                Some(mock_datanode_reply(
1278                    from_peer_id,
1279                    Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
1280                )),
1281                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1282            ),
1283            // UpdateMetadata::Downgrade
1284            Step::next(
1285                "Should be the downgrade leader region",
1286                None,
1287                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1288            ),
1289            // Downgrade Leader
1290            Step::next(
1291                "Should be the upgrade candidate region",
1292                Some(mock_datanode_reply(
1293                    from_peer_id,
1294                    merge_mailbox_messages(vec![
1295                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1296                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1297                    ]),
1298                )),
1299                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1300            ),
1301            // Upgrade Candidate
1302            Step::next(
1303                "Should be the update metadata for upgrading",
1304                Some(mock_datanode_reply(
1305                    to_peer_id,
1306                    merge_mailbox_messages(vec![
1307                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1308                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1309                    ]),
1310                )),
1311                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1312            ),
1313            // UpdateMetadata::Upgrade
1314            Step::next(
1315                "Should be the close downgraded region",
1316                None,
1317                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1318            ),
1319            // CloseDowngradedRegion
1320            Step::next(
1321                "Should be the region migration end",
1322                None,
1323                Assertion::simple(assert_region_migration_end, assert_done),
1324            ),
1325            // RegionMigrationEnd
1326            Step::next(
1327                "Should be the region migration end again",
1328                None,
1329                Assertion::simple(assert_region_migration_end, assert_done),
1330            ),
1331            // RegionMigrationStart
1332            Step::setup(
1333                "Sets state to RegionMigrationStart",
1334                merge_before_test_fn(vec![
1335                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1336                    Arc::new(reset_volatile_ctx),
1337                ]),
1338            ),
1339            // RegionMigrationEnd
1340            // Note: We can't run this test multiple times;
1341            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1342            Step::next(
1343                "Should be the region migration end(has been migrated)",
1344                None,
1345                Assertion::simple(assert_region_migration_end, assert_done),
1346            ),
1347        ];
1348
1349        let steps = [steps.clone()].concat();
1350        let timer = Instant::now();
1351
1352        // Run the table tests.
1353        let runner = ProcedureMigrationSuiteRunner::new(suite)
1354            .steps(steps.clone())
1355            .run_once()
1356            .await;
1357
1358        // Ensure it didn't run into the slow path.
1359        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1360        runner.suite.verify_table_metadata().await;
1361    }
1362}