meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::fmt::{Debug, Display};
30use std::sync::Arc;
31use std::time::Duration;
32
33use common_error::ext::BoxedError;
34use common_event_recorder::{Event, Eventable};
35use common_meta::cache_invalidator::CacheInvalidatorRef;
36use common_meta::ddl::RegionFailureDetectorControllerRef;
37use common_meta::instruction::CacheIdent;
38use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
39use common_meta::key::table_info::TableInfoValue;
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
42use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
43use common_meta::kv_backend::ResettableKvBackendRef;
44use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
45use common_meta::peer::Peer;
46use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
47use common_procedure::error::{
48    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
49};
50use common_procedure::{
51    Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info};
54use manager::RegionMigrationProcedureGuard;
55pub use manager::{
56    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
57    RegionMigrationTriggerReason,
58};
59use serde::{Deserialize, Serialize};
60use snafu::{OptionExt, ResultExt};
61use store_api::storage::RegionId;
62use tokio::time::Instant;
63
64use self::migration_start::RegionMigrationStart;
65use crate::error::{self, Result};
66use crate::events::region_migration_event::RegionMigrationEvent;
67use crate::metrics::{
68    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
69    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
70};
71use crate::service::mailbox::MailboxRef;
72
73/// The default timeout for region migration.
74pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
75
76/// It's shared in each step and available even after recovering.
77///
78/// It will only be updated/stored after the Red node has succeeded.
79///
80/// **Notes: Stores with too large data in the context might incur replication overhead.**
81#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
82pub struct PersistentContext {
83    /// The table catalog.
84    pub(crate) catalog: String,
85    /// The table schema.
86    pub(crate) schema: String,
87    /// The [Peer] of migration source.
88    pub(crate) from_peer: Peer,
89    /// The [Peer] of migration destination.
90    pub(crate) to_peer: Peer,
91    /// The [RegionId] of migration region.
92    pub(crate) region_id: RegionId,
93    /// The timeout for downgrading leader region and upgrading candidate region operations.
94    #[serde(with = "humantime_serde", default = "default_timeout")]
95    pub(crate) timeout: Duration,
96    /// The trigger reason of region migration.
97    #[serde(default)]
98    pub(crate) trigger_reason: RegionMigrationTriggerReason,
99}
100
101fn default_timeout() -> Duration {
102    Duration::from_secs(10)
103}
104
105impl PersistentContext {
106    pub fn lock_key(&self) -> Vec<StringKey> {
107        let region_id = self.region_id;
108        let lock_key = vec![
109            CatalogLock::Read(&self.catalog).into(),
110            SchemaLock::read(&self.catalog, &self.schema).into(),
111            RegionLock::Write(region_id).into(),
112        ];
113
114        lock_key
115    }
116}
117
118impl Eventable for PersistentContext {
119    fn to_event(&self) -> Option<Box<dyn Event>> {
120        Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
121    }
122}
123
124/// Metrics of region migration.
125#[derive(Debug, Clone, Default)]
126pub struct Metrics {
127    /// Elapsed time of downgrading region and upgrading region.
128    operations_elapsed: Duration,
129    /// Elapsed time of flushing leader region.
130    flush_leader_region_elapsed: Duration,
131    /// Elapsed time of downgrading leader region.
132    downgrade_leader_region_elapsed: Duration,
133    /// Elapsed time of open candidate region.
134    open_candidate_region_elapsed: Duration,
135    /// Elapsed time of upgrade candidate region.
136    upgrade_candidate_region_elapsed: Duration,
137}
138
139impl Display for Metrics {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        let total = self.flush_leader_region_elapsed
142            + self.downgrade_leader_region_elapsed
143            + self.open_candidate_region_elapsed
144            + self.upgrade_candidate_region_elapsed;
145        write!(
146            f,
147            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
148            total,
149            self.flush_leader_region_elapsed,
150            self.downgrade_leader_region_elapsed,
151            self.open_candidate_region_elapsed,
152            self.upgrade_candidate_region_elapsed
153        )
154    }
155}
156
157impl Metrics {
158    /// Updates the elapsed time of downgrading region and upgrading region.
159    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
160        self.operations_elapsed += elapsed;
161    }
162
163    /// Updates the elapsed time of flushing leader region.
164    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
165        self.flush_leader_region_elapsed += elapsed;
166    }
167
168    /// Updates the elapsed time of downgrading leader region.
169    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
170        self.downgrade_leader_region_elapsed += elapsed;
171    }
172
173    /// Updates the elapsed time of open candidate region.
174    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
175        self.open_candidate_region_elapsed += elapsed;
176    }
177
178    /// Updates the elapsed time of upgrade candidate region.
179    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
180        self.upgrade_candidate_region_elapsed += elapsed;
181    }
182}
183
184impl Drop for Metrics {
185    fn drop(&mut self) {
186        let total = self.flush_leader_region_elapsed
187            + self.downgrade_leader_region_elapsed
188            + self.open_candidate_region_elapsed
189            + self.upgrade_candidate_region_elapsed;
190        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
191            .with_label_values(&["total"])
192            .observe(total.as_secs_f64());
193
194        if !self.flush_leader_region_elapsed.is_zero() {
195            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
196                .with_label_values(&["flush_leader_region"])
197                .observe(self.flush_leader_region_elapsed.as_secs_f64());
198        }
199
200        if !self.downgrade_leader_region_elapsed.is_zero() {
201            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
202                .with_label_values(&["downgrade_leader_region"])
203                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
204        }
205
206        if !self.open_candidate_region_elapsed.is_zero() {
207            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
208                .with_label_values(&["open_candidate_region"])
209                .observe(self.open_candidate_region_elapsed.as_secs_f64());
210        }
211
212        if !self.upgrade_candidate_region_elapsed.is_zero() {
213            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
214                .with_label_values(&["upgrade_candidate_region"])
215                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
216        }
217    }
218}
219
220/// It's shared in each step and available in executing (including retrying).
221///
222/// It will be dropped if the procedure runner crashes.
223///
224/// The additional remote fetches are only required in the worst cases.
225#[derive(Debug, Clone, Default)]
226pub struct VolatileContext {
227    /// `opening_region_guard` will be set after the
228    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
229    ///
230    /// `opening_region_guard` should be consumed after
231    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
232    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
233    opening_region_guard: Option<OperatingRegionGuard>,
234    /// `table_route` is stored via previous steps for future use.
235    table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
236    /// `datanode_table` is stored via previous steps for future use.
237    from_peer_datanode_table: Option<DatanodeTableValue>,
238    /// `table_info` is stored via previous steps for future use.
239    ///
240    /// `table_info` should remain unchanged during the procedure;
241    /// no other DDL procedure executed concurrently for the current table.
242    table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
243    /// The deadline of leader region lease.
244    leader_region_lease_deadline: Option<Instant>,
245    /// The last_entry_id of leader region.
246    leader_region_last_entry_id: Option<u64>,
247    /// The last_entry_id of leader metadata region (Only used for metric engine).
248    leader_region_metadata_last_entry_id: Option<u64>,
249    /// Metrics of region migration.
250    metrics: Metrics,
251}
252
253impl VolatileContext {
254    /// Sets the `leader_region_lease_deadline` if it does not exist.
255    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
256        if self.leader_region_lease_deadline.is_none() {
257            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
258        }
259    }
260
261    /// Resets the `leader_region_lease_deadline`.
262    pub fn reset_leader_region_lease_deadline(&mut self) {
263        self.leader_region_lease_deadline = None;
264    }
265
266    /// Sets the `leader_region_last_entry_id`.
267    pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
268        self.leader_region_last_entry_id = Some(last_entry_id)
269    }
270
271    /// Sets the `leader_region_metadata_last_entry_id`.
272    pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
273        self.leader_region_metadata_last_entry_id = Some(last_entry_id);
274    }
275}
276
277/// Used to generate new [Context].
278pub trait ContextFactory {
279    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
280}
281
282/// Default implementation.
283#[derive(Clone)]
284pub struct DefaultContextFactory {
285    volatile_ctx: VolatileContext,
286    in_memory_key: ResettableKvBackendRef,
287    table_metadata_manager: TableMetadataManagerRef,
288    opening_region_keeper: MemoryRegionKeeperRef,
289    region_failure_detector_controller: RegionFailureDetectorControllerRef,
290    mailbox: MailboxRef,
291    server_addr: String,
292    cache_invalidator: CacheInvalidatorRef,
293}
294
295impl DefaultContextFactory {
296    /// Returns an [`DefaultContextFactory`].
297    pub fn new(
298        in_memory_key: ResettableKvBackendRef,
299        table_metadata_manager: TableMetadataManagerRef,
300        opening_region_keeper: MemoryRegionKeeperRef,
301        region_failure_detector_controller: RegionFailureDetectorControllerRef,
302        mailbox: MailboxRef,
303        server_addr: String,
304        cache_invalidator: CacheInvalidatorRef,
305    ) -> Self {
306        Self {
307            volatile_ctx: VolatileContext::default(),
308            in_memory_key,
309            table_metadata_manager,
310            opening_region_keeper,
311            region_failure_detector_controller,
312            mailbox,
313            server_addr,
314            cache_invalidator,
315        }
316    }
317}
318
319impl ContextFactory for DefaultContextFactory {
320    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
321        Context {
322            persistent_ctx: Arc::new(persistent_ctx),
323            volatile_ctx: self.volatile_ctx,
324            in_memory: self.in_memory_key,
325            table_metadata_manager: self.table_metadata_manager,
326            opening_region_keeper: self.opening_region_keeper,
327            region_failure_detector_controller: self.region_failure_detector_controller,
328            mailbox: self.mailbox,
329            server_addr: self.server_addr,
330            cache_invalidator: self.cache_invalidator,
331        }
332    }
333}
334
335/// The context of procedure execution.
336pub struct Context {
337    persistent_ctx: Arc<PersistentContext>,
338    volatile_ctx: VolatileContext,
339    in_memory: ResettableKvBackendRef,
340    table_metadata_manager: TableMetadataManagerRef,
341    opening_region_keeper: MemoryRegionKeeperRef,
342    region_failure_detector_controller: RegionFailureDetectorControllerRef,
343    mailbox: MailboxRef,
344    server_addr: String,
345    cache_invalidator: CacheInvalidatorRef,
346}
347
348impl Context {
349    /// Returns the next operation's timeout.
350    pub fn next_operation_timeout(&self) -> Option<Duration> {
351        self.persistent_ctx
352            .timeout
353            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
354    }
355
356    /// Updates operations elapsed.
357    pub fn update_operations_elapsed(&mut self, instant: Instant) {
358        self.volatile_ctx
359            .metrics
360            .update_operations_elapsed(instant.elapsed());
361    }
362
363    /// Updates the elapsed time of flushing leader region.
364    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
365        self.volatile_ctx
366            .metrics
367            .update_flush_leader_region_elapsed(instant.elapsed());
368    }
369
370    /// Updates the elapsed time of downgrading leader region.
371    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
372        self.volatile_ctx
373            .metrics
374            .update_downgrade_leader_region_elapsed(instant.elapsed());
375    }
376
377    /// Updates the elapsed time of open candidate region.
378    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
379        self.volatile_ctx
380            .metrics
381            .update_open_candidate_region_elapsed(instant.elapsed());
382    }
383
384    /// Updates the elapsed time of upgrade candidate region.
385    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
386        self.volatile_ctx
387            .metrics
388            .update_upgrade_candidate_region_elapsed(instant.elapsed());
389    }
390
391    /// Returns address of meta server.
392    pub fn server_addr(&self) -> &str {
393        &self.server_addr
394    }
395
396    /// Returns the `table_route` of [VolatileContext] if any.
397    /// Otherwise, returns the value retrieved from remote.
398    ///
399    /// Retry:
400    /// - Failed to retrieve the metadata of table.
401    pub async fn get_table_route_value(
402        &mut self,
403    ) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
404        let table_route_value = &mut self.volatile_ctx.table_route;
405
406        if table_route_value.is_none() {
407            let table_id = self.persistent_ctx.region_id.table_id();
408            let table_route = self
409                .table_metadata_manager
410                .table_route_manager()
411                .table_route_storage()
412                .get_with_raw_bytes(table_id)
413                .await
414                .context(error::TableMetadataManagerSnafu)
415                .map_err(BoxedError::new)
416                .with_context(|_| error::RetryLaterWithSourceSnafu {
417                    reason: format!("Failed to get TableRoute: {table_id}"),
418                })?
419                .context(error::TableRouteNotFoundSnafu { table_id })?;
420
421            *table_route_value = Some(table_route);
422        }
423
424        Ok(table_route_value.as_ref().unwrap())
425    }
426
427    /// Notifies the RegionSupervisor to register failure detectors of failed region.
428    ///
429    /// The original failure detector was removed once the procedure was triggered.
430    /// Now, we need to register the failure detector for the failed region again.
431    pub async fn register_failure_detectors(&self) {
432        let datanode_id = self.persistent_ctx.from_peer.id;
433        let region_id = self.persistent_ctx.region_id;
434
435        self.region_failure_detector_controller
436            .register_failure_detectors(vec![(datanode_id, region_id)])
437            .await;
438    }
439
440    /// Notifies the RegionSupervisor to deregister failure detectors.
441    ///
442    /// The original failure detectors won't be removed once the procedure was triggered.
443    /// We need to deregister the failure detectors for the original region if the procedure is finished.
444    pub async fn deregister_failure_detectors(&self) {
445        let datanode_id = self.persistent_ctx.from_peer.id;
446        let region_id = self.persistent_ctx.region_id;
447
448        self.region_failure_detector_controller
449            .deregister_failure_detectors(vec![(datanode_id, region_id)])
450            .await;
451    }
452
453    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
454    ///
455    /// The candidate region may be created on the destination peer,
456    /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
457    pub async fn deregister_failure_detectors_for_candidate_region(&self) {
458        let to_peer_id = self.persistent_ctx.to_peer.id;
459        let region_id = self.persistent_ctx.region_id;
460
461        self.region_failure_detector_controller
462            .deregister_failure_detectors(vec![(to_peer_id, region_id)])
463            .await;
464    }
465
466    /// Removes the `table_route` of [VolatileContext], returns true if any.
467    pub fn remove_table_route_value(&mut self) -> bool {
468        let value = self.volatile_ctx.table_route.take();
469        value.is_some()
470    }
471
472    /// Returns the `table_info` of [VolatileContext] if any.
473    /// Otherwise, returns the value retrieved from remote.
474    ///
475    /// Retry:
476    /// - Failed to retrieve the metadata of table.
477    pub async fn get_table_info_value(
478        &mut self,
479    ) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
480        let table_info_value = &mut self.volatile_ctx.table_info;
481
482        if table_info_value.is_none() {
483            let table_id = self.persistent_ctx.region_id.table_id();
484            let table_info = self
485                .table_metadata_manager
486                .table_info_manager()
487                .get(table_id)
488                .await
489                .context(error::TableMetadataManagerSnafu)
490                .map_err(BoxedError::new)
491                .with_context(|_| error::RetryLaterWithSourceSnafu {
492                    reason: format!("Failed to get TableInfo: {table_id}"),
493                })?
494                .context(error::TableInfoNotFoundSnafu { table_id })?;
495
496            *table_info_value = Some(table_info);
497        }
498
499        Ok(table_info_value.as_ref().unwrap())
500    }
501
502    /// Returns the `table_info` of [VolatileContext] if any.
503    /// Otherwise, returns the value retrieved from remote.
504    ///
505    /// Retry:
506    /// - Failed to retrieve the metadata of datanode.
507    pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> {
508        let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table;
509
510        if datanode_value.is_none() {
511            let table_id = self.persistent_ctx.region_id.table_id();
512            let datanode_id = self.persistent_ctx.from_peer.id;
513
514            let datanode_table = self
515                .table_metadata_manager
516                .datanode_table_manager()
517                .get(&DatanodeTableKey {
518                    datanode_id,
519                    table_id,
520                })
521                .await
522                .context(error::TableMetadataManagerSnafu)
523                .map_err(BoxedError::new)
524                .with_context(|_| error::RetryLaterWithSourceSnafu {
525                    reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
526                })?
527                .context(error::DatanodeTableNotFoundSnafu {
528                    table_id,
529                    datanode_id,
530                })?;
531
532            *datanode_value = Some(datanode_table);
533        }
534
535        Ok(datanode_value.as_ref().unwrap())
536    }
537
538    /// Fetches the replay checkpoint for the given topic.
539    pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result<Option<ReplayCheckpoint>> {
540        let region_id = self.region_id();
541        let topic_region_key = TopicRegionKey::new(region_id, topic);
542        let value = self
543            .table_metadata_manager
544            .topic_region_manager()
545            .get(topic_region_key)
546            .await
547            .context(error::TableMetadataManagerSnafu)?;
548
549        Ok(value.and_then(|value| value.checkpoint))
550    }
551
552    /// Returns the [RegionId].
553    pub fn region_id(&self) -> RegionId {
554        self.persistent_ctx.region_id
555    }
556
557    /// Broadcasts the invalidate table cache message.
558    pub async fn invalidate_table_cache(&self) -> Result<()> {
559        let table_id = self.region_id().table_id();
560        // ignore the result
561        let ctx = common_meta::cache_invalidator::Context::default();
562        let _ = self
563            .cache_invalidator
564            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
565            .await;
566        Ok(())
567    }
568
569    /// Returns the [PersistentContext] of the procedure.
570    pub fn persistent_ctx(&self) -> Arc<PersistentContext> {
571        self.persistent_ctx.clone()
572    }
573}
574
575#[async_trait::async_trait]
576#[typetag::serde(tag = "region_migration_state")]
577pub(crate) trait State: Sync + Send + Debug {
578    fn name(&self) -> &'static str {
579        let type_name = std::any::type_name::<Self>();
580        // short name
581        type_name.split("::").last().unwrap_or(type_name)
582    }
583
584    /// Yields the next [State] and [Status].
585    async fn next(
586        &mut self,
587        ctx: &mut Context,
588        procedure_ctx: &ProcedureContext,
589    ) -> Result<(Box<dyn State>, Status)>;
590
591    /// Returns as [Any](std::any::Any).
592    fn as_any(&self) -> &dyn Any;
593}
594
595/// Persistent data of [RegionMigrationProcedure].
596#[derive(Debug, Serialize, Deserialize)]
597pub struct RegionMigrationDataOwned {
598    persistent_ctx: PersistentContext,
599    state: Box<dyn State>,
600}
601
602/// Persistent data of [RegionMigrationProcedure].
603#[derive(Debug, Serialize)]
604pub struct RegionMigrationData<'a> {
605    persistent_ctx: &'a PersistentContext,
606    state: &'a dyn State,
607}
608
609pub(crate) struct RegionMigrationProcedure {
610    state: Box<dyn State>,
611    context: Context,
612    _guard: Option<RegionMigrationProcedureGuard>,
613}
614
615impl RegionMigrationProcedure {
616    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
617
618    pub fn new(
619        persistent_context: PersistentContext,
620        context_factory: impl ContextFactory,
621        guard: Option<RegionMigrationProcedureGuard>,
622    ) -> Self {
623        let state = Box::new(RegionMigrationStart {});
624        Self::new_inner(state, persistent_context, context_factory, guard)
625    }
626
627    fn new_inner(
628        state: Box<dyn State>,
629        persistent_context: PersistentContext,
630        context_factory: impl ContextFactory,
631        guard: Option<RegionMigrationProcedureGuard>,
632    ) -> Self {
633        Self {
634            state,
635            context: context_factory.new_context(persistent_context),
636            _guard: guard,
637        }
638    }
639
640    fn from_json(
641        json: &str,
642        context_factory: impl ContextFactory,
643        tracker: RegionMigrationProcedureTracker,
644    ) -> ProcedureResult<Self> {
645        let RegionMigrationDataOwned {
646            persistent_ctx,
647            state,
648        } = serde_json::from_str(json).context(FromJsonSnafu)?;
649
650        let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
651            region_id: persistent_ctx.region_id,
652            from_peer: persistent_ctx.from_peer.clone(),
653            to_peer: persistent_ctx.to_peer.clone(),
654            timeout: persistent_ctx.timeout,
655            trigger_reason: persistent_ctx.trigger_reason,
656        });
657        let context = context_factory.new_context(persistent_ctx);
658
659        Ok(Self {
660            state,
661            context,
662            _guard: guard,
663        })
664    }
665
666    async fn rollback_inner(&mut self) -> Result<()> {
667        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
668            .with_label_values(&["rollback"])
669            .start_timer();
670
671        let table_id = self.context.region_id().table_id();
672        let region_id = self.context.region_id();
673        self.context.remove_table_route_value();
674        let table_metadata_manager = self.context.table_metadata_manager.clone();
675        let table_route = self.context.get_table_route_value().await?;
676
677        // Safety: It must be a physical table route.
678        let downgraded = table_route
679            .region_routes()
680            .unwrap()
681            .iter()
682            .filter(|route| route.region.id == region_id)
683            .any(|route| route.is_leader_downgrading());
684
685        if downgraded {
686            info!("Rollbacking downgraded region leader table route, region: {region_id}");
687            table_metadata_manager
688                    .update_leader_region_status(table_id, table_route, |route| {
689                        if route.region.id == region_id {
690                            Some(None)
691                        } else {
692                            None
693                        }
694                    })
695                    .await
696                    .context(error::TableMetadataManagerSnafu)
697                    .map_err(BoxedError::new)
698                    .with_context(|_| error::RetryLaterWithSourceSnafu {
699                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
700                    })?;
701        }
702
703        self.context.register_failure_detectors().await;
704
705        Ok(())
706    }
707}
708
709#[async_trait::async_trait]
710impl Procedure for RegionMigrationProcedure {
711    fn type_name(&self) -> &str {
712        Self::TYPE_NAME
713    }
714
715    async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
716        self.rollback_inner()
717            .await
718            .map_err(ProcedureError::external)
719    }
720
721    fn rollback_supported(&self) -> bool {
722        true
723    }
724
725    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
726        let state = &mut self.state;
727
728        let name = state.name();
729        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
730            .with_label_values(&[name])
731            .start_timer();
732        match state.next(&mut self.context, ctx).await {
733            Ok((next, status)) => {
734                *state = next;
735                Ok(status)
736            }
737            Err(e) => {
738                if e.is_retryable() {
739                    METRIC_META_REGION_MIGRATION_ERROR
740                        .with_label_values(&[name, "retryable"])
741                        .inc();
742                    Err(ProcedureError::retry_later(e))
743                } else {
744                    // Consumes the opening region guard before deregistering the failure detectors.
745                    self.context.volatile_ctx.opening_region_guard.take();
746                    self.context
747                        .deregister_failure_detectors_for_candidate_region()
748                        .await;
749                    error!(
750                        e;
751                        "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
752                        self.context.region_id(),
753                        self.context.persistent_ctx.from_peer,
754                        self.context.persistent_ctx.to_peer,
755                        self.context.volatile_ctx.metrics,
756                    );
757                    METRIC_META_REGION_MIGRATION_ERROR
758                        .with_label_values(&[name, "external"])
759                        .inc();
760                    Err(ProcedureError::external(e))
761                }
762            }
763        }
764    }
765
766    fn dump(&self) -> ProcedureResult<String> {
767        let data = RegionMigrationData {
768            state: self.state.as_ref(),
769            persistent_ctx: &self.context.persistent_ctx,
770        };
771        serde_json::to_string(&data).context(ToJsonSnafu)
772    }
773
774    fn lock_key(&self) -> LockKey {
775        LockKey::new(self.context.persistent_ctx.lock_key())
776    }
777
778    fn user_metadata(&self) -> Option<UserMetadata> {
779        Some(UserMetadata::new(self.context.persistent_ctx()))
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use std::assert_matches::assert_matches;
786    use std::sync::Arc;
787
788    use common_meta::distributed_time_constants::REGION_LEASE_SECS;
789    use common_meta::instruction::Instruction;
790    use common_meta::key::test_utils::new_test_table_info;
791    use common_meta::rpc::router::{Region, RegionRoute};
792
793    use super::update_metadata::UpdateMetadata;
794    use super::*;
795    use crate::handler::HeartbeatMailbox;
796    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
797    use crate::procedure::region_migration::test_util::*;
798    use crate::procedure::test_util::{
799        new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
800        new_upgrade_region_reply,
801    };
802    use crate::service::mailbox::Channel;
803
804    fn new_persistent_context() -> PersistentContext {
805        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
806    }
807
808    #[test]
809    fn test_lock_key() {
810        let persistent_context = new_persistent_context();
811        let expected_keys = persistent_context.lock_key();
812
813        let env = TestingEnv::new();
814        let context = env.context_factory();
815
816        let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
817
818        let key = procedure.lock_key();
819        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
820
821        for key in expected_keys {
822            assert!(keys.contains(&key));
823        }
824    }
825
826    #[test]
827    fn test_data_serialization() {
828        let persistent_context = new_persistent_context();
829
830        let env = TestingEnv::new();
831        let context = env.context_factory();
832
833        let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
834
835        let serialized = procedure.dump().unwrap();
836        let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
837        assert_eq!(expected, serialized);
838    }
839
840    #[test]
841    fn test_backward_compatibility() {
842        let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
843        // NOTES: Changes it will break backward compatibility.
844        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
845        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
846
847        assert_eq!(persistent_ctx, deserialized);
848    }
849
850    #[derive(Debug, Serialize, Deserialize, Default)]
851    pub struct MockState;
852
853    #[async_trait::async_trait]
854    #[typetag::serde]
855    impl State for MockState {
856        async fn next(
857            &mut self,
858            _ctx: &mut Context,
859            _procedure_ctx: &ProcedureContext,
860        ) -> Result<(Box<dyn State>, Status)> {
861            Ok((Box::new(MockState), Status::done()))
862        }
863
864        fn as_any(&self) -> &dyn Any {
865            self
866        }
867    }
868
869    #[tokio::test]
870    async fn test_execution_after_deserialized() {
871        let env = TestingEnv::new();
872
873        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
874            let persistent_context = new_persistent_context();
875            let context_factory = env.context_factory();
876            let state = Box::<MockState>::default();
877            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
878        }
879
880        let ctx = TestingEnv::procedure_context();
881        let mut procedure = new_mock_procedure(&env);
882        let mut status = None;
883        for _ in 0..3 {
884            status = Some(procedure.execute(&ctx).await.unwrap());
885        }
886        assert!(status.unwrap().is_done());
887
888        let ctx = TestingEnv::procedure_context();
889        let mut procedure = new_mock_procedure(&env);
890
891        status = Some(procedure.execute(&ctx).await.unwrap());
892
893        let serialized = procedure.dump().unwrap();
894
895        let context_factory = env.context_factory();
896        let tracker = env.tracker();
897        let mut procedure =
898            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
899                .unwrap();
900        assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
901
902        for _ in 1..3 {
903            status = Some(procedure.execute(&ctx).await.unwrap());
904        }
905        assert!(status.unwrap().is_done());
906    }
907
908    #[tokio::test]
909    async fn test_broadcast_invalidate_table_cache() {
910        let mut env = TestingEnv::new();
911        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
912        let ctx = env.context_factory().new_context(persistent_context);
913        let mailbox_ctx = env.mailbox_context();
914
915        // No receivers.
916        ctx.invalidate_table_cache().await.unwrap();
917
918        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
919
920        mailbox_ctx
921            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
922            .await;
923
924        ctx.invalidate_table_cache().await.unwrap();
925
926        let resp = rx.recv().await.unwrap().unwrap();
927        let msg = resp.mailbox_message.unwrap();
928
929        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
930        assert_eq!(
931            instruction,
932            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
933        );
934    }
935
936    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
937        vec![
938            // MigrationStart
939            Step::next(
940                "Should be the update metadata for downgrading",
941                None,
942                Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
943            ),
944            // UpdateMetadata::Downgrade
945            Step::next(
946                "Should be the downgrade leader region",
947                None,
948                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
949            ),
950            // Downgrade Candidate
951            Step::next(
952                "Should be the upgrade candidate region",
953                Some(mock_datanode_reply(
954                    from_peer_id,
955                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
956                )),
957                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
958            ),
959            // Upgrade Candidate
960            Step::next(
961                "Should be the update metadata for upgrading",
962                Some(mock_datanode_reply(
963                    to_peer_id,
964                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
965                )),
966                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
967            ),
968            // UpdateMetadata::Upgrade
969            Step::next(
970                "Should be the close downgraded region",
971                None,
972                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
973            ),
974            // CloseDowngradedRegion
975            Step::next(
976                "Should be the region migration end",
977                None,
978                Assertion::simple(assert_region_migration_end, assert_done),
979            ),
980            // RegionMigrationEnd
981            Step::next(
982                "Should be the region migration end again",
983                None,
984                Assertion::simple(assert_region_migration_end, assert_done),
985            ),
986        ]
987    }
988
989    #[tokio::test]
990    async fn test_procedure_flow() {
991        common_telemetry::init_default_ut_logging();
992
993        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
994        let state = Box::new(RegionMigrationStart);
995
996        // The table metadata.
997        let from_peer_id = persistent_context.from_peer.id;
998        let to_peer_id = persistent_context.to_peer.id;
999        let from_peer = persistent_context.from_peer.clone();
1000        let to_peer = persistent_context.to_peer.clone();
1001        let region_id = persistent_context.region_id;
1002        let table_info = new_test_table_info(1024, vec![1]).into();
1003        let region_routes = vec![RegionRoute {
1004            region: Region::new_test(region_id),
1005            leader_peer: Some(from_peer),
1006            follower_peers: vec![to_peer],
1007            ..Default::default()
1008        }];
1009
1010        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1011        suite.init_table_metadata(table_info, region_routes).await;
1012
1013        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1014        let timer = Instant::now();
1015
1016        // Run the table tests.
1017        let runner = ProcedureMigrationSuiteRunner::new(suite)
1018            .steps(steps)
1019            .run_once()
1020            .await;
1021
1022        // Ensure it didn't run into the slow path.
1023        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1024
1025        runner.suite.verify_table_metadata().await;
1026    }
1027
1028    #[tokio::test]
1029    async fn test_procedure_flow_idempotent() {
1030        common_telemetry::init_default_ut_logging();
1031
1032        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1033        let state = Box::new(RegionMigrationStart);
1034
1035        // The table metadata.
1036        let from_peer_id = persistent_context.from_peer.id;
1037        let to_peer_id = persistent_context.to_peer.id;
1038        let from_peer = persistent_context.from_peer.clone();
1039        let to_peer = persistent_context.to_peer.clone();
1040        let region_id = persistent_context.region_id;
1041        let table_info = new_test_table_info(1024, vec![1]).into();
1042        let region_routes = vec![RegionRoute {
1043            region: Region::new_test(region_id),
1044            leader_peer: Some(from_peer),
1045            follower_peers: vec![to_peer],
1046            ..Default::default()
1047        }];
1048
1049        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1050        suite.init_table_metadata(table_info, region_routes).await;
1051
1052        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1053        let setup_to_latest_persisted_state = Step::setup(
1054            "Sets state to UpdateMetadata::Downgrade",
1055            merge_before_test_fn(vec![
1056                setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1057                Arc::new(reset_volatile_ctx),
1058            ]),
1059        );
1060
1061        let steps = [
1062            steps.clone(),
1063            vec![setup_to_latest_persisted_state.clone()],
1064            steps.clone()[1..].to_vec(),
1065            vec![setup_to_latest_persisted_state],
1066            steps.clone()[1..].to_vec(),
1067        ]
1068        .concat();
1069        let timer = Instant::now();
1070
1071        // Run the table tests.
1072        let runner = ProcedureMigrationSuiteRunner::new(suite)
1073            .steps(steps.clone())
1074            .run_once()
1075            .await;
1076
1077        // Ensure it didn't run into the slow path.
1078        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1079
1080        runner.suite.verify_table_metadata().await;
1081    }
1082
1083    #[tokio::test]
1084    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1085        common_telemetry::init_default_ut_logging();
1086
1087        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1088        let state = Box::new(RegionMigrationStart);
1089
1090        // The table metadata.
1091        let to_peer_id = persistent_context.to_peer.id;
1092        let from_peer = persistent_context.from_peer.clone();
1093        let region_id = persistent_context.region_id;
1094        let table_info = new_test_table_info(1024, vec![1]).into();
1095        let region_routes = vec![RegionRoute {
1096            region: Region::new_test(region_id),
1097            leader_peer: Some(from_peer),
1098            follower_peers: vec![],
1099            ..Default::default()
1100        }];
1101
1102        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1103        suite.init_table_metadata(table_info, region_routes).await;
1104
1105        let steps = vec![
1106            // Migration Start
1107            Step::next(
1108                "Should be the open candidate region",
1109                None,
1110                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1111            ),
1112            // OpenCandidateRegion
1113            Step::next(
1114                "Should be throwing a non-retry error",
1115                Some(mock_datanode_reply(
1116                    to_peer_id,
1117                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1118                )),
1119                Assertion::error(|error| assert!(error.is_retryable())),
1120            ),
1121            // OpenCandidateRegion
1122            Step::next(
1123                "Should be throwing a non-retry error again",
1124                Some(mock_datanode_reply(
1125                    to_peer_id,
1126                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1127                )),
1128                Assertion::error(|error| assert!(error.is_retryable())),
1129            ),
1130        ];
1131
1132        let setup_to_latest_persisted_state = Step::setup(
1133            "Sets state to UpdateMetadata::Downgrade",
1134            merge_before_test_fn(vec![
1135                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1136                Arc::new(reset_volatile_ctx),
1137            ]),
1138        );
1139
1140        let steps = [
1141            steps.clone(),
1142            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1143            vec![setup_to_latest_persisted_state.clone()],
1144            steps.clone()[1..].to_vec(),
1145            vec![setup_to_latest_persisted_state],
1146            steps.clone()[1..].to_vec(),
1147        ]
1148        .concat();
1149
1150        // Run the table tests.
1151        let runner = ProcedureMigrationSuiteRunner::new(suite)
1152            .steps(steps.clone())
1153            .run_once()
1154            .await;
1155
1156        let table_routes_version = runner
1157            .env()
1158            .table_metadata_manager()
1159            .table_route_manager()
1160            .table_route_storage()
1161            .get(region_id.table_id())
1162            .await
1163            .unwrap()
1164            .unwrap()
1165            .version();
1166        // Should be unchanged.
1167        assert_eq!(table_routes_version.unwrap(), 0);
1168    }
1169
1170    #[tokio::test]
1171    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1172        common_telemetry::init_default_ut_logging();
1173
1174        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1175        let state = Box::new(RegionMigrationStart);
1176
1177        // The table metadata.
1178        let from_peer_id = persistent_context.from_peer.id;
1179        let to_peer_id = persistent_context.to_peer.id;
1180        let from_peer = persistent_context.from_peer.clone();
1181        let to_peer = persistent_context.to_peer.clone();
1182        let region_id = persistent_context.region_id;
1183        let table_info = new_test_table_info(1024, vec![1]).into();
1184        let region_routes = vec![RegionRoute {
1185            region: Region::new_test(region_id),
1186            leader_peer: Some(from_peer),
1187            follower_peers: vec![to_peer],
1188            ..Default::default()
1189        }];
1190
1191        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1192        suite.init_table_metadata(table_info, region_routes).await;
1193
1194        let steps = vec![
1195            // MigrationStart
1196            Step::next(
1197                "Should be the update metadata for downgrading",
1198                None,
1199                Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
1200            ),
1201            // UpdateMetadata::Downgrade
1202            Step::next(
1203                "Should be the downgrade leader region",
1204                None,
1205                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1206            ),
1207            // Downgrade Candidate
1208            Step::next(
1209                "Should be the upgrade candidate region",
1210                Some(mock_datanode_reply(
1211                    from_peer_id,
1212                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1213                )),
1214                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1215            ),
1216            // Upgrade Candidate
1217            Step::next(
1218                "Should be the rollback metadata",
1219                Some(mock_datanode_reply(
1220                    to_peer_id,
1221                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1222                )),
1223                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1224            ),
1225            // UpdateMetadata::Rollback
1226            Step::next(
1227                "Should be the region migration abort",
1228                None,
1229                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1230            ),
1231            // RegionMigrationAbort
1232            Step::next(
1233                "Should throw an error",
1234                None,
1235                Assertion::error(|error| {
1236                    assert!(!error.is_retryable());
1237                    assert_matches!(error, error::Error::MigrationAbort { .. });
1238                }),
1239            ),
1240        ];
1241
1242        let setup_to_latest_persisted_state = Step::setup(
1243            "Sets state to UpdateMetadata::Downgrade",
1244            merge_before_test_fn(vec![
1245                setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1246                Arc::new(reset_volatile_ctx),
1247            ]),
1248        );
1249
1250        let steps = [
1251            steps.clone(),
1252            vec![setup_to_latest_persisted_state.clone()],
1253            steps.clone()[1..].to_vec(),
1254            vec![setup_to_latest_persisted_state],
1255            steps.clone()[1..].to_vec(),
1256        ]
1257        .concat();
1258
1259        // Run the table tests.
1260        ProcedureMigrationSuiteRunner::new(suite)
1261            .steps(steps.clone())
1262            .run_once()
1263            .await;
1264    }
1265
1266    #[tokio::test]
1267    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1268        common_telemetry::init_default_ut_logging();
1269
1270        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1271        let state = Box::new(RegionMigrationStart);
1272
1273        // The table metadata.
1274        let to_peer_id = persistent_context.to_peer.id;
1275        let from_peer_id = persistent_context.from_peer.id;
1276        let from_peer = persistent_context.from_peer.clone();
1277        let region_id = persistent_context.region_id;
1278        let table_info = new_test_table_info(1024, vec![1]).into();
1279        let region_routes = vec![RegionRoute {
1280            region: Region::new_test(region_id),
1281            leader_peer: Some(from_peer),
1282            follower_peers: vec![],
1283            ..Default::default()
1284        }];
1285
1286        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1287        suite.init_table_metadata(table_info, region_routes).await;
1288
1289        let steps = vec![
1290            // Migration Start
1291            Step::next(
1292                "Should be the open candidate region",
1293                None,
1294                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1295            ),
1296            // OpenCandidateRegion
1297            Step::next(
1298                "Should be throwing a retryable error",
1299                Some(mock_datanode_reply(
1300                    to_peer_id,
1301                    Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1302                )),
1303                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1304            ),
1305            // OpenCandidateRegion
1306            Step::next(
1307                "Should be the update metadata for downgrading",
1308                Some(mock_datanode_reply(
1309                    to_peer_id,
1310                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1311                )),
1312                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1313            ),
1314            // Flush Leader Region
1315            Step::next(
1316                "Should be the flush leader region",
1317                Some(mock_datanode_reply(
1318                    from_peer_id,
1319                    Arc::new(move |id| {
1320                        Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1321                    }),
1322                )),
1323                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1324            ),
1325            // UpdateMetadata::Downgrade
1326            Step::next(
1327                "Should be the downgrade leader region",
1328                None,
1329                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1330            ),
1331            // Downgrade Leader
1332            Step::next(
1333                "Should be the upgrade candidate region",
1334                Some(mock_datanode_reply(
1335                    from_peer_id,
1336                    merge_mailbox_messages(vec![
1337                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1338                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1339                    ]),
1340                )),
1341                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1342            ),
1343            // Upgrade Candidate
1344            Step::next(
1345                "Should be the update metadata for upgrading",
1346                Some(mock_datanode_reply(
1347                    to_peer_id,
1348                    merge_mailbox_messages(vec![
1349                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1350                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1351                    ]),
1352                )),
1353                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1354            ),
1355            // UpdateMetadata::Upgrade
1356            Step::next(
1357                "Should be the close downgraded region",
1358                None,
1359                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1360            ),
1361            // CloseDowngradedRegion
1362            Step::next(
1363                "Should be the region migration end",
1364                None,
1365                Assertion::simple(assert_region_migration_end, assert_done),
1366            ),
1367            // RegionMigrationEnd
1368            Step::next(
1369                "Should be the region migration end again",
1370                None,
1371                Assertion::simple(assert_region_migration_end, assert_done),
1372            ),
1373            // RegionMigrationStart
1374            Step::setup(
1375                "Sets state to RegionMigrationStart",
1376                merge_before_test_fn(vec![
1377                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1378                    Arc::new(reset_volatile_ctx),
1379                ]),
1380            ),
1381            // RegionMigrationEnd
1382            // Note: We can't run this test multiple times;
1383            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1384            Step::next(
1385                "Should be the region migration end(has been migrated)",
1386                None,
1387                Assertion::simple(assert_region_migration_end, assert_done),
1388            ),
1389        ];
1390
1391        let steps = [steps.clone()].concat();
1392        let timer = Instant::now();
1393
1394        // Run the table tests.
1395        let runner = ProcedureMigrationSuiteRunner::new(suite)
1396            .steps(steps.clone())
1397            .run_once()
1398            .await;
1399
1400        // Ensure it didn't run into the slow path.
1401        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1402        runner.suite.verify_table_metadata().await;
1403    }
1404}