meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27pub(crate) mod utils;
28
29use std::any::Any;
30use std::collections::{HashMap, HashSet};
31use std::fmt::{Debug, Display};
32use std::sync::Arc;
33use std::time::Duration;
34
35use common_error::ext::BoxedError;
36use common_event_recorder::{Event, Eventable};
37use common_meta::cache_invalidator::CacheInvalidatorRef;
38use common_meta::ddl::RegionFailureDetectorControllerRef;
39use common_meta::instruction::CacheIdent;
40use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
41use common_meta::key::table_route::TableRouteValue;
42use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
43use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
44use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
45use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
46use common_meta::peer::Peer;
47use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
48use common_procedure::error::{
49    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
50};
51use common_procedure::{
52    Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
53};
54use common_telemetry::{error, info};
55use manager::RegionMigrationProcedureGuard;
56pub use manager::{
57    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
58    RegionMigrationTriggerReason,
59};
60use serde::{Deserialize, Deserializer, Serialize};
61use snafu::{OptionExt, ResultExt};
62use store_api::storage::{RegionId, TableId};
63use tokio::time::Instant;
64
65use self::migration_start::RegionMigrationStart;
66use crate::error::{self, Result};
67use crate::events::region_migration_event::RegionMigrationEvent;
68use crate::metrics::{
69    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
70    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
71};
72use crate::service::mailbox::MailboxRef;
73
74/// The default timeout for region migration.
75pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
76
77#[derive(Debug, Deserialize)]
78#[serde(untagged)]
79enum SingleOrMultiple<T> {
80    Single(T),
81    Multiple(Vec<T>),
82}
83
84fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
85where
86    D: Deserializer<'de>,
87    T: Deserialize<'de>,
88{
89    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
90    Ok(match helper {
91        SingleOrMultiple::Single(x) => vec![x],
92        SingleOrMultiple::Multiple(xs) => xs,
93    })
94}
95
96/// It's shared in each step and available even after recovering.
97///
98/// It will only be updated/stored after the Red node has succeeded.
99///
100/// **Notes: Stores with too large data in the context might incur replication overhead.**
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct PersistentContext {
103    /// The table catalog.
104    #[deprecated(note = "use `catalog_and_schema` instead")]
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub(crate) catalog: Option<String>,
107    /// The table schema.
108    #[deprecated(note = "use `catalog_and_schema` instead")]
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub(crate) schema: Option<String>,
111    /// The catalog and schema of the regions.
112    #[serde(default, skip_serializing_if = "Vec::is_empty")]
113    pub(crate) catalog_and_schema: Vec<(String, String)>,
114    /// The [Peer] of migration source.
115    pub(crate) from_peer: Peer,
116    /// The [Peer] of migration destination.
117    pub(crate) to_peer: Peer,
118    /// The [RegionId] of migration region.
119    #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
120    pub(crate) region_ids: Vec<RegionId>,
121    /// The timeout for downgrading leader region and upgrading candidate region operations.
122    #[serde(with = "humantime_serde", default = "default_timeout")]
123    pub(crate) timeout: Duration,
124    /// The trigger reason of region migration.
125    #[serde(default)]
126    pub(crate) trigger_reason: RegionMigrationTriggerReason,
127}
128
129impl PersistentContext {
130    pub fn new(
131        catalog_and_schema: Vec<(String, String)>,
132        from_peer: Peer,
133        to_peer: Peer,
134        region_ids: Vec<RegionId>,
135        timeout: Duration,
136        trigger_reason: RegionMigrationTriggerReason,
137    ) -> Self {
138        #[allow(deprecated)]
139        Self {
140            catalog: None,
141            schema: None,
142            catalog_and_schema,
143            from_peer,
144            to_peer,
145            region_ids,
146            timeout,
147            trigger_reason,
148        }
149    }
150}
151
152fn default_timeout() -> Duration {
153    Duration::from_secs(10)
154}
155
156impl PersistentContext {
157    pub fn lock_key(&self) -> Vec<StringKey> {
158        let mut lock_keys =
159            Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
160        #[allow(deprecated)]
161        if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
162            lock_keys.push(CatalogLock::Read(catalog).into());
163            lock_keys.push(SchemaLock::read(catalog, schema).into());
164        }
165        for (catalog, schema) in self.catalog_and_schema.iter() {
166            lock_keys.push(CatalogLock::Read(catalog).into());
167            lock_keys.push(SchemaLock::read(catalog, schema).into());
168        }
169
170        // Sort the region ids to ensure the same order of region ids.
171        let mut region_ids = self.region_ids.clone();
172        region_ids.sort_unstable();
173        for region_id in region_ids {
174            lock_keys.push(RegionLock::Write(region_id).into());
175        }
176        lock_keys
177    }
178
179    /// Returns the table ids of the regions.
180    ///
181    /// The return value is a set of table ids.
182    pub fn region_table_ids(&self) -> Vec<TableId> {
183        self.region_ids
184            .iter()
185            .map(|region_id| region_id.table_id())
186            .collect::<HashSet<_>>()
187            .into_iter()
188            .collect()
189    }
190
191    /// Returns the table regions map.
192    ///
193    /// The key is the table id, the value is the region ids of the table.
194    pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
195        let mut table_regions = HashMap::new();
196        for region_id in &self.region_ids {
197            table_regions
198                .entry(region_id.table_id())
199                .or_insert_with(Vec::new)
200                .push(*region_id);
201        }
202        table_regions
203    }
204}
205
206impl Eventable for PersistentContext {
207    fn to_event(&self) -> Option<Box<dyn Event>> {
208        Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
209    }
210}
211
212/// Metrics of region migration.
213#[derive(Debug, Clone, Default)]
214pub struct Metrics {
215    /// Elapsed time of downgrading region and upgrading region.
216    operations_elapsed: Duration,
217    /// Elapsed time of flushing leader region.
218    flush_leader_region_elapsed: Duration,
219    /// Elapsed time of downgrading leader region.
220    downgrade_leader_region_elapsed: Duration,
221    /// Elapsed time of open candidate region.
222    open_candidate_region_elapsed: Duration,
223    /// Elapsed time of upgrade candidate region.
224    upgrade_candidate_region_elapsed: Duration,
225}
226
227impl Display for Metrics {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        let total = self.flush_leader_region_elapsed
230            + self.downgrade_leader_region_elapsed
231            + self.open_candidate_region_elapsed
232            + self.upgrade_candidate_region_elapsed;
233        write!(
234            f,
235            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
236            total,
237            self.flush_leader_region_elapsed,
238            self.downgrade_leader_region_elapsed,
239            self.open_candidate_region_elapsed,
240            self.upgrade_candidate_region_elapsed
241        )
242    }
243}
244
245impl Metrics {
246    /// Updates the elapsed time of downgrading region and upgrading region.
247    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
248        self.operations_elapsed += elapsed;
249    }
250
251    /// Updates the elapsed time of flushing leader region.
252    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
253        self.flush_leader_region_elapsed += elapsed;
254    }
255
256    /// Updates the elapsed time of downgrading leader region.
257    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
258        self.downgrade_leader_region_elapsed += elapsed;
259    }
260
261    /// Updates the elapsed time of open candidate region.
262    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
263        self.open_candidate_region_elapsed += elapsed;
264    }
265
266    /// Updates the elapsed time of upgrade candidate region.
267    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
268        self.upgrade_candidate_region_elapsed += elapsed;
269    }
270}
271
272impl Drop for Metrics {
273    fn drop(&mut self) {
274        let total = self.flush_leader_region_elapsed
275            + self.downgrade_leader_region_elapsed
276            + self.open_candidate_region_elapsed
277            + self.upgrade_candidate_region_elapsed;
278        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
279            .with_label_values(&["total"])
280            .observe(total.as_secs_f64());
281
282        if !self.flush_leader_region_elapsed.is_zero() {
283            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
284                .with_label_values(&["flush_leader_region"])
285                .observe(self.flush_leader_region_elapsed.as_secs_f64());
286        }
287
288        if !self.downgrade_leader_region_elapsed.is_zero() {
289            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
290                .with_label_values(&["downgrade_leader_region"])
291                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
292        }
293
294        if !self.open_candidate_region_elapsed.is_zero() {
295            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
296                .with_label_values(&["open_candidate_region"])
297                .observe(self.open_candidate_region_elapsed.as_secs_f64());
298        }
299
300        if !self.upgrade_candidate_region_elapsed.is_zero() {
301            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
302                .with_label_values(&["upgrade_candidate_region"])
303                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
304        }
305    }
306}
307
308/// It's shared in each step and available in executing (including retrying).
309///
310/// It will be dropped if the procedure runner crashes.
311///
312/// The additional remote fetches are only required in the worst cases.
313#[derive(Debug, Clone, Default)]
314pub struct VolatileContext {
315    /// `opening_region_guard` will be set after the
316    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
317    ///
318    /// `opening_region_guards` should be consumed after
319    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
320    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
321    opening_region_guards: Vec<OperatingRegionGuard>,
322    /// The deadline of leader region lease.
323    leader_region_lease_deadline: Option<Instant>,
324    /// The datanode table values.
325    from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
326    /// The last_entry_ids of leader regions.
327    leader_region_last_entry_ids: HashMap<RegionId, u64>,
328    /// The last_entry_ids of leader metadata regions (Only used for metric engine).
329    leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
330    /// Metrics of region migration.
331    metrics: Metrics,
332}
333
334impl VolatileContext {
335    /// Sets the `leader_region_lease_deadline` if it does not exist.
336    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
337        if self.leader_region_lease_deadline.is_none() {
338            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
339        }
340    }
341
342    /// Resets the `leader_region_lease_deadline`.
343    pub fn reset_leader_region_lease_deadline(&mut self) {
344        self.leader_region_lease_deadline = None;
345    }
346
347    /// Sets the `leader_region_last_entry_id`.
348    pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
349        self.leader_region_last_entry_ids
350            .insert(region_id, last_entry_id);
351    }
352
353    /// Sets the `leader_region_metadata_last_entry_id`.
354    pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
355        self.leader_region_metadata_last_entry_ids
356            .insert(region_id, last_entry_id);
357    }
358}
359
360/// Used to generate new [Context].
361pub trait ContextFactory {
362    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
363}
364
365/// Default implementation.
366#[derive(Clone)]
367pub struct DefaultContextFactory {
368    volatile_ctx: VolatileContext,
369    in_memory_key: ResettableKvBackendRef,
370    table_metadata_manager: TableMetadataManagerRef,
371    opening_region_keeper: MemoryRegionKeeperRef,
372    region_failure_detector_controller: RegionFailureDetectorControllerRef,
373    mailbox: MailboxRef,
374    server_addr: String,
375    cache_invalidator: CacheInvalidatorRef,
376}
377
378impl DefaultContextFactory {
379    /// Returns an [`DefaultContextFactory`].
380    pub fn new(
381        in_memory_key: ResettableKvBackendRef,
382        table_metadata_manager: TableMetadataManagerRef,
383        opening_region_keeper: MemoryRegionKeeperRef,
384        region_failure_detector_controller: RegionFailureDetectorControllerRef,
385        mailbox: MailboxRef,
386        server_addr: String,
387        cache_invalidator: CacheInvalidatorRef,
388    ) -> Self {
389        Self {
390            volatile_ctx: VolatileContext::default(),
391            in_memory_key,
392            table_metadata_manager,
393            opening_region_keeper,
394            region_failure_detector_controller,
395            mailbox,
396            server_addr,
397            cache_invalidator,
398        }
399    }
400}
401
402impl ContextFactory for DefaultContextFactory {
403    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
404        Context {
405            persistent_ctx,
406            volatile_ctx: self.volatile_ctx,
407            in_memory: self.in_memory_key,
408            table_metadata_manager: self.table_metadata_manager,
409            opening_region_keeper: self.opening_region_keeper,
410            region_failure_detector_controller: self.region_failure_detector_controller,
411            mailbox: self.mailbox,
412            server_addr: self.server_addr,
413            cache_invalidator: self.cache_invalidator,
414        }
415    }
416}
417
418/// The context of procedure execution.
419pub struct Context {
420    persistent_ctx: PersistentContext,
421    volatile_ctx: VolatileContext,
422    in_memory: KvBackendRef,
423    table_metadata_manager: TableMetadataManagerRef,
424    opening_region_keeper: MemoryRegionKeeperRef,
425    region_failure_detector_controller: RegionFailureDetectorControllerRef,
426    mailbox: MailboxRef,
427    server_addr: String,
428    cache_invalidator: CacheInvalidatorRef,
429}
430
431impl Context {
432    /// Returns the next operation's timeout.
433    pub fn next_operation_timeout(&self) -> Option<Duration> {
434        self.persistent_ctx
435            .timeout
436            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
437    }
438
439    /// Updates operations elapsed.
440    pub fn update_operations_elapsed(&mut self, instant: Instant) {
441        self.volatile_ctx
442            .metrics
443            .update_operations_elapsed(instant.elapsed());
444    }
445
446    /// Updates the elapsed time of flushing leader region.
447    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
448        self.volatile_ctx
449            .metrics
450            .update_flush_leader_region_elapsed(instant.elapsed());
451    }
452
453    /// Updates the elapsed time of downgrading leader region.
454    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
455        self.volatile_ctx
456            .metrics
457            .update_downgrade_leader_region_elapsed(instant.elapsed());
458    }
459
460    /// Updates the elapsed time of open candidate region.
461    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
462        self.volatile_ctx
463            .metrics
464            .update_open_candidate_region_elapsed(instant.elapsed());
465    }
466
467    /// Updates the elapsed time of upgrade candidate region.
468    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
469        self.volatile_ctx
470            .metrics
471            .update_upgrade_candidate_region_elapsed(instant.elapsed());
472    }
473
474    /// Returns address of meta server.
475    pub fn server_addr(&self) -> &str {
476        &self.server_addr
477    }
478
479    /// Returns the table ids of the regions.
480    pub fn region_table_ids(&self) -> Vec<TableId> {
481        self.persistent_ctx
482            .region_ids
483            .iter()
484            .map(|region_id| region_id.table_id())
485            .collect::<HashSet<_>>()
486            .into_iter()
487            .collect()
488    }
489
490    /// Returns the `table_routes` of [VolatileContext] if any.
491    /// Otherwise, returns the value retrieved from remote.
492    ///
493    /// Retry:
494    /// - Failed to retrieve the metadata of table.
495    pub async fn get_table_route_values(
496        &self,
497    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
498        let table_ids = self.persistent_ctx.region_table_ids();
499        let table_routes = self
500            .table_metadata_manager
501            .table_route_manager()
502            .table_route_storage()
503            .batch_get_with_raw_bytes(&table_ids)
504            .await
505            .context(error::TableMetadataManagerSnafu)
506            .map_err(BoxedError::new)
507            .with_context(|_| error::RetryLaterWithSourceSnafu {
508                reason: format!("Failed to get table routes: {table_ids:?}"),
509            })?;
510        let table_routes = table_ids
511            .into_iter()
512            .zip(table_routes)
513            .filter_map(|(table_id, table_route)| {
514                table_route.map(|table_route| (table_id, table_route))
515            })
516            .collect::<HashMap<_, _>>();
517        Ok(table_routes)
518    }
519
520    /// Returns the `table_route` of [VolatileContext] if any.
521    /// Otherwise, returns the value retrieved from remote.
522    ///
523    /// Retry:
524    /// - Failed to retrieve the metadata of table.
525    pub async fn get_table_route_value(
526        &self,
527        table_id: TableId,
528    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
529        let table_route_value = self
530            .table_metadata_manager
531            .table_route_manager()
532            .table_route_storage()
533            .get_with_raw_bytes(table_id)
534            .await
535            .context(error::TableMetadataManagerSnafu)
536            .map_err(BoxedError::new)
537            .with_context(|_| error::RetryLaterWithSourceSnafu {
538                reason: format!("Failed to get table routes: {table_id:}"),
539            })?
540            .context(error::TableRouteNotFoundSnafu { table_id })?;
541        Ok(table_route_value)
542    }
543
544    /// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any.
545    /// Otherwise, returns the value retrieved from remote.
546    ///
547    /// Retry:
548    /// - Failed to retrieve the metadata of datanode table.
549    pub async fn get_from_peer_datanode_table_values(
550        &mut self,
551    ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
552        let from_peer_datanode_table_values =
553            &mut self.volatile_ctx.from_peer_datanode_table_values;
554        if from_peer_datanode_table_values.is_none() {
555            let table_ids = self.persistent_ctx.region_table_ids();
556            let datanode_table_keys = table_ids
557                .iter()
558                .map(|table_id| DatanodeTableKey {
559                    datanode_id: self.persistent_ctx.from_peer.id,
560                    table_id: *table_id,
561                })
562                .collect::<Vec<_>>();
563            let datanode_table_values = self
564                .table_metadata_manager
565                .datanode_table_manager()
566                .batch_get(&datanode_table_keys)
567                .await
568                .context(error::TableMetadataManagerSnafu)
569                .map_err(BoxedError::new)
570                .with_context(|_| error::RetryLaterWithSourceSnafu {
571                    reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
572                })?
573                .into_iter()
574                .map(|(k, v)| (k.table_id, v))
575                .collect();
576            *from_peer_datanode_table_values = Some(datanode_table_values);
577        }
578        Ok(from_peer_datanode_table_values.as_ref().unwrap())
579    }
580
581    /// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any.
582    /// Otherwise, returns the value retrieved from remote.
583    ///
584    /// Retry:
585    /// - Failed to retrieve the metadata of datanode table.
586    pub async fn get_from_peer_datanode_table_value(
587        &self,
588        table_id: TableId,
589    ) -> Result<DatanodeTableValue> {
590        let datanode_table_value = self
591            .table_metadata_manager
592            .datanode_table_manager()
593            .get(&DatanodeTableKey {
594                datanode_id: self.persistent_ctx.from_peer.id,
595                table_id,
596            })
597            .await
598            .context(error::TableMetadataManagerSnafu)
599            .map_err(BoxedError::new)
600            .with_context(|_| error::RetryLaterWithSourceSnafu {
601                reason: format!("Failed to get DatanodeTable: {table_id}"),
602            })?
603            .context(error::DatanodeTableNotFoundSnafu {
604                table_id,
605                datanode_id: self.persistent_ctx.from_peer.id,
606            })?;
607        Ok(datanode_table_value)
608    }
609
610    /// Notifies the RegionSupervisor to register failure detectors of failed region.
611    ///
612    /// The original failure detector was removed once the procedure was triggered.
613    /// Now, we need to register the failure detector for the failed region again.
614    pub async fn register_failure_detectors(&self) {
615        let datanode_id = self.persistent_ctx.from_peer.id;
616        let region_ids = &self.persistent_ctx.region_ids;
617        let detecting_regions = region_ids
618            .iter()
619            .map(|region_id| (datanode_id, *region_id))
620            .collect::<Vec<_>>();
621        self.region_failure_detector_controller
622            .register_failure_detectors(detecting_regions)
623            .await;
624        info!(
625            "Registered failure detectors after migration failures for datanode {}, regions {:?}",
626            datanode_id, region_ids
627        );
628    }
629
630    /// Notifies the RegionSupervisor to deregister failure detectors.
631    ///
632    /// The original failure detectors won't be removed once the procedure was triggered.
633    /// We need to deregister the failure detectors for the original region if the procedure is finished.
634    pub async fn deregister_failure_detectors(&self) {
635        let datanode_id = self.persistent_ctx.from_peer.id;
636        let region_ids = &self.persistent_ctx.region_ids;
637        let detecting_regions = region_ids
638            .iter()
639            .map(|region_id| (datanode_id, *region_id))
640            .collect::<Vec<_>>();
641
642        self.region_failure_detector_controller
643            .deregister_failure_detectors(detecting_regions)
644            .await;
645    }
646
647    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
648    ///
649    /// The candidate region may be created on the destination peer,
650    /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
651    pub async fn deregister_failure_detectors_for_candidate_region(&self) {
652        let to_peer_id = self.persistent_ctx.to_peer.id;
653        let region_ids = &self.persistent_ctx.region_ids;
654        let detecting_regions = region_ids
655            .iter()
656            .map(|region_id| (to_peer_id, *region_id))
657            .collect::<Vec<_>>();
658
659        self.region_failure_detector_controller
660            .deregister_failure_detectors(detecting_regions)
661            .await;
662    }
663
664    /// Fetches the replay checkpoints for the given topic region keys.
665    pub async fn get_replay_checkpoints(
666        &self,
667        topic_region_keys: Vec<TopicRegionKey<'_>>,
668    ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
669        let topic_region_values = self
670            .table_metadata_manager
671            .topic_region_manager()
672            .batch_get(topic_region_keys)
673            .await
674            .context(error::TableMetadataManagerSnafu)?;
675
676        let replay_checkpoints = topic_region_values
677            .into_iter()
678            .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
679            .collect::<HashMap<_, _>>();
680
681        Ok(replay_checkpoints)
682    }
683
684    /// Broadcasts the invalidate table cache message.
685    pub async fn invalidate_table_cache(&self) -> Result<()> {
686        let table_ids = self.region_table_ids();
687        let mut cache_idents = Vec::with_capacity(table_ids.len());
688        for table_id in &table_ids {
689            cache_idents.push(CacheIdent::TableId(*table_id));
690        }
691        // ignore the result
692        let ctx = common_meta::cache_invalidator::Context::default();
693        let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
694        Ok(())
695    }
696
697    /// Returns the [PersistentContext] of the procedure.
698    pub fn persistent_ctx(&self) -> PersistentContext {
699        self.persistent_ctx.clone()
700    }
701}
702
703#[async_trait::async_trait]
704#[typetag::serde(tag = "region_migration_state")]
705pub(crate) trait State: Sync + Send + Debug {
706    fn name(&self) -> &'static str {
707        let type_name = std::any::type_name::<Self>();
708        // short name
709        type_name.split("::").last().unwrap_or(type_name)
710    }
711
712    /// Yields the next [State] and [Status].
713    async fn next(
714        &mut self,
715        ctx: &mut Context,
716        procedure_ctx: &ProcedureContext,
717    ) -> Result<(Box<dyn State>, Status)>;
718
719    /// Returns as [Any](std::any::Any).
720    fn as_any(&self) -> &dyn Any;
721}
722
723/// Persistent data of [RegionMigrationProcedure].
724#[derive(Debug, Serialize, Deserialize)]
725pub struct RegionMigrationDataOwned {
726    persistent_ctx: PersistentContext,
727    state: Box<dyn State>,
728}
729
730/// Persistent data of [RegionMigrationProcedure].
731#[derive(Debug, Serialize)]
732pub struct RegionMigrationData<'a> {
733    persistent_ctx: &'a PersistentContext,
734    state: &'a dyn State,
735}
736
737pub(crate) struct RegionMigrationProcedure {
738    state: Box<dyn State>,
739    context: Context,
740    _guards: Vec<RegionMigrationProcedureGuard>,
741}
742
743impl RegionMigrationProcedure {
744    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
745
746    pub fn new(
747        persistent_context: PersistentContext,
748        context_factory: impl ContextFactory,
749        guards: Vec<RegionMigrationProcedureGuard>,
750    ) -> Self {
751        let state = Box::new(RegionMigrationStart {});
752        Self::new_inner(state, persistent_context, context_factory, guards)
753    }
754
755    fn new_inner(
756        state: Box<dyn State>,
757        persistent_context: PersistentContext,
758        context_factory: impl ContextFactory,
759        guards: Vec<RegionMigrationProcedureGuard>,
760    ) -> Self {
761        Self {
762            state,
763            context: context_factory.new_context(persistent_context),
764            _guards: guards,
765        }
766    }
767
768    fn from_json(
769        json: &str,
770        context_factory: impl ContextFactory,
771        tracker: RegionMigrationProcedureTracker,
772    ) -> ProcedureResult<Self> {
773        let RegionMigrationDataOwned {
774            persistent_ctx,
775            state,
776        } = serde_json::from_str(json).context(FromJsonSnafu)?;
777        let guards = persistent_ctx
778            .region_ids
779            .iter()
780            .flat_map(|region_id| {
781                tracker.insert_running_procedure(&RegionMigrationProcedureTask {
782                    region_id: *region_id,
783                    from_peer: persistent_ctx.from_peer.clone(),
784                    to_peer: persistent_ctx.to_peer.clone(),
785                    timeout: persistent_ctx.timeout,
786                    trigger_reason: persistent_ctx.trigger_reason,
787                })
788            })
789            .collect::<Vec<_>>();
790
791        let context = context_factory.new_context(persistent_ctx);
792
793        Ok(Self {
794            state,
795            context,
796            _guards: guards,
797        })
798    }
799
800    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
801        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
802            .with_label_values(&["rollback"])
803            .start_timer();
804        let ctx = &self.context;
805        let table_regions = ctx.persistent_ctx.table_regions();
806        for (table_id, regions) in table_regions {
807            let table_lock = TableLock::Write(table_id).into();
808            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
809            let table_route = ctx.get_table_route_value(table_id).await?;
810            let region_routes = table_route.region_routes().unwrap();
811            let downgraded = region_routes
812                .iter()
813                .filter(|route| regions.contains(&route.region.id))
814                .any(|route| route.is_leader_downgrading());
815            if downgraded {
816                info!(
817                    "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
818                );
819                let table_metadata_manager = &ctx.table_metadata_manager;
820                table_metadata_manager
821                    .update_leader_region_status(table_id, &table_route, |route| {
822                        if regions.contains(&route.region.id) {
823                            Some(None)
824                        } else {
825                            None
826                        }
827                    })
828                    .await
829                    .context(error::TableMetadataManagerSnafu)
830                    .map_err(BoxedError::new)
831                    .with_context(|_| error::RetryLaterWithSourceSnafu {
832                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
833                    })?;
834            }
835        }
836        self.context
837            .deregister_failure_detectors_for_candidate_region()
838            .await;
839        self.context.register_failure_detectors().await;
840
841        Ok(())
842    }
843}
844
845#[async_trait::async_trait]
846impl Procedure for RegionMigrationProcedure {
847    fn type_name(&self) -> &str {
848        Self::TYPE_NAME
849    }
850
851    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
852        self.rollback_inner(ctx)
853            .await
854            .map_err(ProcedureError::external)
855    }
856
857    fn rollback_supported(&self) -> bool {
858        true
859    }
860
861    #[tracing::instrument(skip_all, fields(
862        state = %self.state.name(),
863        region_count = self.context.persistent_ctx.region_ids.len(),
864        from_peer = self.context.persistent_ctx.from_peer.id,
865        to_peer = self.context.persistent_ctx.to_peer.id,
866    ))]
867    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
868        let state = &mut self.state;
869
870        let name = state.name();
871        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
872            .with_label_values(&[name])
873            .start_timer();
874        match state.next(&mut self.context, ctx).await {
875            Ok((next, status)) => {
876                *state = next;
877                Ok(status)
878            }
879            Err(e) => {
880                if e.is_retryable() {
881                    METRIC_META_REGION_MIGRATION_ERROR
882                        .with_label_values(&[name, "retryable"])
883                        .inc();
884                    Err(ProcedureError::retry_later(e))
885                } else {
886                    // Consumes the opening region guard before deregistering the failure detectors.
887                    self.context.volatile_ctx.opening_region_guards.clear();
888                    self.context
889                        .deregister_failure_detectors_for_candidate_region()
890                        .await;
891                    error!(
892                        e;
893                        "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
894                        self.context.persistent_ctx.region_ids,
895                        self.context.persistent_ctx.from_peer,
896                        self.context.persistent_ctx.to_peer,
897                        self.context.volatile_ctx.metrics,
898                    );
899                    METRIC_META_REGION_MIGRATION_ERROR
900                        .with_label_values(&[name, "external"])
901                        .inc();
902                    Err(ProcedureError::external(e))
903                }
904            }
905        }
906    }
907
908    fn dump(&self) -> ProcedureResult<String> {
909        let data = RegionMigrationData {
910            state: self.state.as_ref(),
911            persistent_ctx: &self.context.persistent_ctx,
912        };
913        serde_json::to_string(&data).context(ToJsonSnafu)
914    }
915
916    fn lock_key(&self) -> LockKey {
917        LockKey::new(self.context.persistent_ctx.lock_key())
918    }
919
920    fn user_metadata(&self) -> Option<UserMetadata> {
921        Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use std::assert_matches::assert_matches;
928    use std::sync::Arc;
929
930    use common_meta::distributed_time_constants::default_distributed_time_constants;
931    use common_meta::instruction::Instruction;
932    use common_meta::key::test_utils::new_test_table_info;
933    use common_meta::rpc::router::{Region, RegionRoute};
934
935    use super::*;
936    use crate::handler::HeartbeatMailbox;
937    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
938    use crate::procedure::region_migration::test_util::*;
939    use crate::procedure::test_util::{
940        new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
941        new_upgrade_region_reply,
942    };
943    use crate::service::mailbox::Channel;
944
945    fn new_persistent_context() -> PersistentContext {
946        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
947    }
948
949    #[test]
950    fn test_lock_key() {
951        let persistent_context = new_persistent_context();
952        let expected_keys = persistent_context.lock_key();
953
954        let env = TestingEnv::new();
955        let context = env.context_factory();
956
957        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
958
959        let key = procedure.lock_key();
960        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
961
962        for key in expected_keys {
963            assert!(keys.contains(&key));
964        }
965    }
966
967    #[test]
968    fn test_data_serialization() {
969        let persistent_context = new_persistent_context();
970
971        let env = TestingEnv::new();
972        let context = env.context_factory();
973
974        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
975
976        let serialized = procedure.dump().unwrap();
977        let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
978        assert_eq!(expected, serialized);
979    }
980
981    #[test]
982    fn test_backward_compatibility() {
983        let persistent_ctx = PersistentContext {
984            #[allow(deprecated)]
985            catalog: Some("greptime".into()),
986            #[allow(deprecated)]
987            schema: Some("public".into()),
988            catalog_and_schema: vec![],
989            from_peer: Peer::empty(1),
990            to_peer: Peer::empty(2),
991            region_ids: vec![RegionId::new(1024, 1)],
992            timeout: Duration::from_secs(10),
993            trigger_reason: RegionMigrationTriggerReason::default(),
994        };
995        // NOTES: Changes it will break backward compatibility.
996        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
997        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
998
999        assert_eq!(persistent_ctx, deserialized);
1000    }
1001
1002    #[derive(Debug, Serialize, Deserialize, Default)]
1003    pub struct MockState;
1004
1005    #[async_trait::async_trait]
1006    #[typetag::serde]
1007    impl State for MockState {
1008        async fn next(
1009            &mut self,
1010            _ctx: &mut Context,
1011            _procedure_ctx: &ProcedureContext,
1012        ) -> Result<(Box<dyn State>, Status)> {
1013            Ok((Box::new(MockState), Status::done()))
1014        }
1015
1016        fn as_any(&self) -> &dyn Any {
1017            self
1018        }
1019    }
1020
1021    #[tokio::test]
1022    async fn test_execution_after_deserialized() {
1023        let env = TestingEnv::new();
1024
1025        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
1026            let persistent_context = new_persistent_context();
1027            let context_factory = env.context_factory();
1028            let state = Box::<MockState>::default();
1029            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
1030        }
1031
1032        let ctx = TestingEnv::procedure_context();
1033        let mut procedure = new_mock_procedure(&env);
1034        let mut status = None;
1035        for _ in 0..3 {
1036            status = Some(procedure.execute(&ctx).await.unwrap());
1037        }
1038        assert!(status.unwrap().is_done());
1039
1040        let ctx = TestingEnv::procedure_context();
1041        let mut procedure = new_mock_procedure(&env);
1042
1043        status = Some(procedure.execute(&ctx).await.unwrap());
1044
1045        let serialized = procedure.dump().unwrap();
1046
1047        let context_factory = env.context_factory();
1048        let tracker = env.tracker();
1049        let mut procedure =
1050            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
1051                .unwrap();
1052        for region_id in &procedure.context.persistent_ctx.region_ids {
1053            assert!(tracker.contains(*region_id));
1054        }
1055
1056        for _ in 1..3 {
1057            status = Some(procedure.execute(&ctx).await.unwrap());
1058        }
1059        assert!(status.unwrap().is_done());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_broadcast_invalidate_table_cache() {
1064        let mut env = TestingEnv::new();
1065        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1066        let ctx = env.context_factory().new_context(persistent_context);
1067        let mailbox_ctx = env.mailbox_context();
1068
1069        // No receivers.
1070        ctx.invalidate_table_cache().await.unwrap();
1071
1072        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1073
1074        mailbox_ctx
1075            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1076            .await;
1077
1078        ctx.invalidate_table_cache().await.unwrap();
1079
1080        let resp = rx.recv().await.unwrap().unwrap();
1081        let msg = resp.mailbox_message.unwrap();
1082
1083        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1084        assert_eq!(
1085            instruction,
1086            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1087        );
1088    }
1089
1090    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1091        vec![
1092            // MigrationStart
1093            Step::next(
1094                "Should be the open candidate region",
1095                None,
1096                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1097            ),
1098            // OpenCandidateRegion
1099            Step::next(
1100                "Should be the flush leader region",
1101                Some(mock_datanode_reply(
1102                    to_peer_id,
1103                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1104                )),
1105                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1106            ),
1107            // Flush Leader Region
1108            Step::next(
1109                "Should be the flush leader region",
1110                Some(mock_datanode_reply(
1111                    from_peer_id,
1112                    Arc::new(move |id| {
1113                        Ok(new_flush_region_reply_for_region(
1114                            id,
1115                            RegionId::new(1024, 1),
1116                            true,
1117                            None,
1118                        ))
1119                    }),
1120                )),
1121                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1122            ),
1123            // UpdateMetadata::Downgrade
1124            Step::next(
1125                "Should be the downgrade leader region",
1126                None,
1127                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1128            ),
1129            // Downgrade Candidate
1130            Step::next(
1131                "Should be the upgrade candidate region",
1132                Some(mock_datanode_reply(
1133                    from_peer_id,
1134                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1135                )),
1136                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1137            ),
1138            // Upgrade Candidate
1139            Step::next(
1140                "Should be the update metadata for upgrading",
1141                Some(mock_datanode_reply(
1142                    to_peer_id,
1143                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1144                )),
1145                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1146            ),
1147            // UpdateMetadata::Upgrade
1148            Step::next(
1149                "Should be the close downgraded region",
1150                None,
1151                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1152            ),
1153            // CloseDowngradedRegion
1154            Step::next(
1155                "Should be the region migration end",
1156                None,
1157                Assertion::simple(assert_region_migration_end, assert_done),
1158            ),
1159            // RegionMigrationEnd
1160            Step::next(
1161                "Should be the region migration end again",
1162                None,
1163                Assertion::simple(assert_region_migration_end, assert_done),
1164            ),
1165        ]
1166    }
1167
1168    #[tokio::test]
1169    async fn test_procedure_flow() {
1170        common_telemetry::init_default_ut_logging();
1171
1172        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1173        let state = Box::new(RegionMigrationStart);
1174
1175        // The table metadata.
1176        let from_peer_id = persistent_context.from_peer.id;
1177        let to_peer_id = persistent_context.to_peer.id;
1178        let from_peer = persistent_context.from_peer.clone();
1179        let to_peer = persistent_context.to_peer.clone();
1180        let region_id = persistent_context.region_ids[0];
1181        let table_info = new_test_table_info(1024);
1182        let region_routes = vec![RegionRoute {
1183            region: Region::new_test(region_id),
1184            leader_peer: Some(from_peer),
1185            follower_peers: vec![to_peer],
1186            ..Default::default()
1187        }];
1188
1189        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1190        suite.init_table_metadata(table_info, region_routes).await;
1191
1192        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1193        let timer = Instant::now();
1194
1195        // Run the table tests.
1196        let runner = ProcedureMigrationSuiteRunner::new(suite)
1197            .steps(steps)
1198            .run_once()
1199            .await;
1200
1201        let region_lease = default_distributed_time_constants().region_lease.as_secs();
1202
1203        // Ensure it didn't run into the slow path.
1204        assert!(timer.elapsed().as_secs() < region_lease / 2);
1205
1206        runner.suite.verify_table_metadata().await;
1207    }
1208
1209    #[tokio::test]
1210    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1211        common_telemetry::init_default_ut_logging();
1212
1213        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1214        let state = Box::new(RegionMigrationStart);
1215
1216        // The table metadata.
1217        let to_peer_id = persistent_context.to_peer.id;
1218        let from_peer = persistent_context.from_peer.clone();
1219        let region_id = persistent_context.region_ids[0];
1220        let table_info = new_test_table_info(1024);
1221        let region_routes = vec![RegionRoute {
1222            region: Region::new_test(region_id),
1223            leader_peer: Some(from_peer),
1224            follower_peers: vec![],
1225            ..Default::default()
1226        }];
1227
1228        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1229        suite.init_table_metadata(table_info, region_routes).await;
1230
1231        let steps = vec![
1232            // Migration Start
1233            Step::next(
1234                "Should be the open candidate region",
1235                None,
1236                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1237            ),
1238            // OpenCandidateRegion
1239            Step::next(
1240                "Should be throwing a non-retry error",
1241                Some(mock_datanode_reply(
1242                    to_peer_id,
1243                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1244                )),
1245                Assertion::error(|error| assert!(error.is_retryable())),
1246            ),
1247            // OpenCandidateRegion
1248            Step::next(
1249                "Should be throwing a non-retry error again",
1250                Some(mock_datanode_reply(
1251                    to_peer_id,
1252                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1253                )),
1254                Assertion::error(|error| assert!(error.is_retryable())),
1255            ),
1256        ];
1257
1258        let setup_to_latest_persisted_state = Step::setup(
1259            "Sets state to UpdateMetadata::Downgrade",
1260            merge_before_test_fn(vec![
1261                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1262                Arc::new(reset_volatile_ctx),
1263            ]),
1264        );
1265
1266        let steps = [
1267            steps.clone(),
1268            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1269            vec![setup_to_latest_persisted_state.clone()],
1270            steps.clone()[1..].to_vec(),
1271            vec![setup_to_latest_persisted_state],
1272            steps.clone()[1..].to_vec(),
1273        ]
1274        .concat();
1275
1276        // Run the table tests.
1277        let runner = ProcedureMigrationSuiteRunner::new(suite)
1278            .steps(steps.clone())
1279            .run_once()
1280            .await;
1281
1282        let table_routes_version = runner
1283            .env()
1284            .table_metadata_manager()
1285            .table_route_manager()
1286            .table_route_storage()
1287            .get(region_id.table_id())
1288            .await
1289            .unwrap()
1290            .unwrap()
1291            .version();
1292        // Should be unchanged.
1293        assert_eq!(table_routes_version.unwrap(), 0);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1298        common_telemetry::init_default_ut_logging();
1299
1300        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1301        let state = Box::new(RegionMigrationStart);
1302
1303        // The table metadata.
1304        let from_peer_id = persistent_context.from_peer.id;
1305        let to_peer_id = persistent_context.to_peer.id;
1306        let from_peer = persistent_context.from_peer.clone();
1307        let region_id = persistent_context.region_ids[0];
1308        let table_info = new_test_table_info(1024);
1309        let region_routes = vec![RegionRoute {
1310            region: Region::new_test(region_id),
1311            leader_peer: Some(from_peer),
1312            follower_peers: vec![],
1313            ..Default::default()
1314        }];
1315
1316        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1317        suite.init_table_metadata(table_info, region_routes).await;
1318
1319        let steps = vec![
1320            // MigrationStart
1321            Step::next(
1322                "Should be the open candidate region",
1323                None,
1324                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1325            ),
1326            // OpenCandidateRegion
1327            Step::next(
1328                "Should be the flush leader region",
1329                Some(mock_datanode_reply(
1330                    to_peer_id,
1331                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1332                )),
1333                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1334            ),
1335            // Flush Leader Region
1336            Step::next(
1337                "Should be the flush leader region",
1338                Some(mock_datanode_reply(
1339                    from_peer_id,
1340                    Arc::new(move |id| {
1341                        Ok(new_flush_region_reply_for_region(
1342                            id,
1343                            RegionId::new(1024, 1),
1344                            true,
1345                            None,
1346                        ))
1347                    }),
1348                )),
1349                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1350            ),
1351            // UpdateMetadata::Downgrade
1352            Step::next(
1353                "Should be the downgrade leader region",
1354                None,
1355                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1356            ),
1357            // Downgrade Candidate
1358            Step::next(
1359                "Should be the upgrade candidate region",
1360                Some(mock_datanode_reply(
1361                    from_peer_id,
1362                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1363                )),
1364                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1365            ),
1366            // Upgrade Candidate
1367            Step::next(
1368                "Should be the rollback metadata",
1369                Some(mock_datanode_reply(
1370                    to_peer_id,
1371                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1372                )),
1373                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1374            ),
1375            // UpdateMetadata::Rollback
1376            Step::next(
1377                "Should be the region migration abort",
1378                None,
1379                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1380            ),
1381            // RegionMigrationAbort
1382            Step::next(
1383                "Should throw an error",
1384                None,
1385                Assertion::error(|error| {
1386                    assert!(!error.is_retryable());
1387                    assert_matches!(error, error::Error::MigrationAbort { .. });
1388                }),
1389            ),
1390        ];
1391
1392        let setup_to_latest_persisted_state = Step::setup(
1393            "Sets state to OpenCandidateRegion",
1394            merge_before_test_fn(vec![
1395                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1396                Arc::new(reset_volatile_ctx),
1397            ]),
1398        );
1399
1400        let steps = [
1401            steps.clone(),
1402            vec![setup_to_latest_persisted_state.clone()],
1403            steps.clone()[1..].to_vec(),
1404            vec![setup_to_latest_persisted_state],
1405            steps.clone()[1..].to_vec(),
1406        ]
1407        .concat();
1408
1409        // Run the table tests.
1410        ProcedureMigrationSuiteRunner::new(suite)
1411            .steps(steps.clone())
1412            .run_once()
1413            .await;
1414    }
1415
1416    #[tokio::test]
1417    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1418        common_telemetry::init_default_ut_logging();
1419
1420        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1421        let state = Box::new(RegionMigrationStart);
1422
1423        // The table metadata.
1424        let to_peer_id = persistent_context.to_peer.id;
1425        let from_peer_id = persistent_context.from_peer.id;
1426        let from_peer = persistent_context.from_peer.clone();
1427        let region_id = persistent_context.region_ids[0];
1428        let table_info = new_test_table_info(1024);
1429        let region_routes = vec![RegionRoute {
1430            region: Region::new_test(region_id),
1431            leader_peer: Some(from_peer),
1432            follower_peers: vec![],
1433            ..Default::default()
1434        }];
1435
1436        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1437        suite.init_table_metadata(table_info, region_routes).await;
1438
1439        let steps = vec![
1440            // Migration Start
1441            Step::next(
1442                "Should be the open candidate region",
1443                None,
1444                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1445            ),
1446            // OpenCandidateRegion
1447            Step::next(
1448                "Should be throwing a retryable error",
1449                Some(mock_datanode_reply(
1450                    to_peer_id,
1451                    Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1452                )),
1453                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1454            ),
1455            // OpenCandidateRegion
1456            Step::next(
1457                "Should be the update metadata for downgrading",
1458                Some(mock_datanode_reply(
1459                    to_peer_id,
1460                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1461                )),
1462                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1463            ),
1464            // Flush Leader Region
1465            Step::next(
1466                "Should be the flush leader region",
1467                Some(mock_datanode_reply(
1468                    from_peer_id,
1469                    Arc::new(move |id| {
1470                        Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1471                    }),
1472                )),
1473                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1474            ),
1475            // UpdateMetadata::Downgrade
1476            Step::next(
1477                "Should be the downgrade leader region",
1478                None,
1479                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1480            ),
1481            // Downgrade Leader
1482            Step::next(
1483                "Should be the upgrade candidate region",
1484                Some(mock_datanode_reply(
1485                    from_peer_id,
1486                    merge_mailbox_messages(vec![
1487                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1488                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1489                    ]),
1490                )),
1491                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1492            ),
1493            // Upgrade Candidate
1494            Step::next(
1495                "Should be the update metadata for upgrading",
1496                Some(mock_datanode_reply(
1497                    to_peer_id,
1498                    merge_mailbox_messages(vec![
1499                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1500                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1501                    ]),
1502                )),
1503                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1504            ),
1505            // UpdateMetadata::Upgrade
1506            Step::next(
1507                "Should be the close downgraded region",
1508                None,
1509                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1510            ),
1511            // CloseDowngradedRegion
1512            Step::next(
1513                "Should be the region migration end",
1514                None,
1515                Assertion::simple(assert_region_migration_end, assert_done),
1516            ),
1517            // RegionMigrationEnd
1518            Step::next(
1519                "Should be the region migration end again",
1520                None,
1521                Assertion::simple(assert_region_migration_end, assert_done),
1522            ),
1523            // RegionMigrationStart
1524            Step::setup(
1525                "Sets state to RegionMigrationStart",
1526                merge_before_test_fn(vec![
1527                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1528                    Arc::new(reset_volatile_ctx),
1529                ]),
1530            ),
1531            // RegionMigrationEnd
1532            // Note: We can't run this test multiple times;
1533            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1534            Step::next(
1535                "Should be the region migration end(has been migrated)",
1536                None,
1537                Assertion::simple(assert_region_migration_end, assert_done),
1538            ),
1539        ];
1540
1541        let steps = [steps.clone()].concat();
1542        let timer = Instant::now();
1543
1544        // Run the table tests.
1545        let runner = ProcedureMigrationSuiteRunner::new(suite)
1546            .steps(steps.clone())
1547            .run_once()
1548            .await;
1549
1550        let region_lease = default_distributed_time_constants().region_lease.as_secs();
1551        // Ensure it didn't run into the slow path.
1552        assert!(timer.elapsed().as_secs() < region_lease);
1553        runner.suite.verify_table_metadata().await;
1554    }
1555}