meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27pub(crate) mod utils;
28
29use std::any::Any;
30use std::collections::{HashMap, HashSet};
31use std::fmt::{Debug, Display};
32use std::sync::Arc;
33use std::time::Duration;
34
35use common_error::ext::BoxedError;
36use common_event_recorder::{Event, Eventable};
37use common_meta::cache_invalidator::CacheInvalidatorRef;
38use common_meta::ddl::RegionFailureDetectorControllerRef;
39use common_meta::instruction::CacheIdent;
40use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
41use common_meta::key::table_route::TableRouteValue;
42use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
43use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
44use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
45use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
46use common_meta::peer::Peer;
47use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
48use common_procedure::error::{
49    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
50};
51use common_procedure::{
52    Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
53};
54use common_telemetry::{error, info};
55use manager::RegionMigrationProcedureGuard;
56pub use manager::{
57    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
58    RegionMigrationTriggerReason,
59};
60use serde::{Deserialize, Deserializer, Serialize};
61use snafu::{OptionExt, ResultExt};
62use store_api::storage::{RegionId, TableId};
63use tokio::time::Instant;
64
65use self::migration_start::RegionMigrationStart;
66use crate::error::{self, Result};
67use crate::events::region_migration_event::RegionMigrationEvent;
68use crate::metrics::{
69    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
70    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
71};
72use crate::service::mailbox::MailboxRef;
73
74/// The default timeout for region migration.
75pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
76
77#[derive(Debug, Deserialize)]
78#[serde(untagged)]
79enum SingleOrMultiple<T> {
80    Single(T),
81    Multiple(Vec<T>),
82}
83
84fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
85where
86    D: Deserializer<'de>,
87    T: Deserialize<'de>,
88{
89    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
90    Ok(match helper {
91        SingleOrMultiple::Single(x) => vec![x],
92        SingleOrMultiple::Multiple(xs) => xs,
93    })
94}
95
96/// It's shared in each step and available even after recovering.
97///
98/// It will only be updated/stored after the Red node has succeeded.
99///
100/// **Notes: Stores with too large data in the context might incur replication overhead.**
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct PersistentContext {
103    /// The table catalog.
104    #[deprecated(note = "use `catalog_and_schema` instead")]
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub(crate) catalog: Option<String>,
107    /// The table schema.
108    #[deprecated(note = "use `catalog_and_schema` instead")]
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub(crate) schema: Option<String>,
111    /// The catalog and schema of the regions.
112    #[serde(default, skip_serializing_if = "Vec::is_empty")]
113    pub(crate) catalog_and_schema: Vec<(String, String)>,
114    /// The [Peer] of migration source.
115    pub(crate) from_peer: Peer,
116    /// The [Peer] of migration destination.
117    pub(crate) to_peer: Peer,
118    /// The [RegionId] of migration region.
119    #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
120    pub(crate) region_ids: Vec<RegionId>,
121    /// The timeout for downgrading leader region and upgrading candidate region operations.
122    #[serde(with = "humantime_serde", default = "default_timeout")]
123    pub(crate) timeout: Duration,
124    /// The trigger reason of region migration.
125    #[serde(default)]
126    pub(crate) trigger_reason: RegionMigrationTriggerReason,
127}
128
129impl PersistentContext {
130    pub fn new(
131        catalog_and_schema: Vec<(String, String)>,
132        from_peer: Peer,
133        to_peer: Peer,
134        region_ids: Vec<RegionId>,
135        timeout: Duration,
136        trigger_reason: RegionMigrationTriggerReason,
137    ) -> Self {
138        #[allow(deprecated)]
139        Self {
140            catalog: None,
141            schema: None,
142            catalog_and_schema,
143            from_peer,
144            to_peer,
145            region_ids,
146            timeout,
147            trigger_reason,
148        }
149    }
150}
151
152fn default_timeout() -> Duration {
153    Duration::from_secs(10)
154}
155
156impl PersistentContext {
157    pub fn lock_key(&self) -> Vec<StringKey> {
158        let mut lock_keys =
159            Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
160        #[allow(deprecated)]
161        if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
162            lock_keys.push(CatalogLock::Read(catalog).into());
163            lock_keys.push(SchemaLock::read(catalog, schema).into());
164        }
165        for (catalog, schema) in self.catalog_and_schema.iter() {
166            lock_keys.push(CatalogLock::Read(catalog).into());
167            lock_keys.push(SchemaLock::read(catalog, schema).into());
168        }
169
170        // Sort the region ids to ensure the same order of region ids.
171        let mut region_ids = self.region_ids.clone();
172        region_ids.sort_unstable();
173        for region_id in region_ids {
174            lock_keys.push(RegionLock::Write(region_id).into());
175        }
176        lock_keys
177    }
178
179    /// Returns the table ids of the regions.
180    ///
181    /// The return value is a set of table ids.
182    pub fn region_table_ids(&self) -> Vec<TableId> {
183        self.region_ids
184            .iter()
185            .map(|region_id| region_id.table_id())
186            .collect::<HashSet<_>>()
187            .into_iter()
188            .collect()
189    }
190
191    /// Returns the table regions map.
192    ///
193    /// The key is the table id, the value is the region ids of the table.
194    pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
195        let mut table_regions = HashMap::new();
196        for region_id in &self.region_ids {
197            table_regions
198                .entry(region_id.table_id())
199                .or_insert_with(Vec::new)
200                .push(*region_id);
201        }
202        table_regions
203    }
204}
205
206impl Eventable for PersistentContext {
207    fn to_event(&self) -> Option<Box<dyn Event>> {
208        Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
209    }
210}
211
212/// Metrics of region migration.
213#[derive(Debug, Clone, Default)]
214pub struct Metrics {
215    /// Elapsed time of downgrading region and upgrading region.
216    operations_elapsed: Duration,
217    /// Elapsed time of flushing leader region.
218    flush_leader_region_elapsed: Duration,
219    /// Elapsed time of downgrading leader region.
220    downgrade_leader_region_elapsed: Duration,
221    /// Elapsed time of open candidate region.
222    open_candidate_region_elapsed: Duration,
223    /// Elapsed time of upgrade candidate region.
224    upgrade_candidate_region_elapsed: Duration,
225}
226
227impl Display for Metrics {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        let total = self.flush_leader_region_elapsed
230            + self.downgrade_leader_region_elapsed
231            + self.open_candidate_region_elapsed
232            + self.upgrade_candidate_region_elapsed;
233        write!(
234            f,
235            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
236            total,
237            self.flush_leader_region_elapsed,
238            self.downgrade_leader_region_elapsed,
239            self.open_candidate_region_elapsed,
240            self.upgrade_candidate_region_elapsed
241        )
242    }
243}
244
245impl Metrics {
246    /// Updates the elapsed time of downgrading region and upgrading region.
247    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
248        self.operations_elapsed += elapsed;
249    }
250
251    /// Updates the elapsed time of flushing leader region.
252    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
253        self.flush_leader_region_elapsed += elapsed;
254    }
255
256    /// Updates the elapsed time of downgrading leader region.
257    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
258        self.downgrade_leader_region_elapsed += elapsed;
259    }
260
261    /// Updates the elapsed time of open candidate region.
262    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
263        self.open_candidate_region_elapsed += elapsed;
264    }
265
266    /// Updates the elapsed time of upgrade candidate region.
267    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
268        self.upgrade_candidate_region_elapsed += elapsed;
269    }
270}
271
272impl Drop for Metrics {
273    fn drop(&mut self) {
274        let total = self.flush_leader_region_elapsed
275            + self.downgrade_leader_region_elapsed
276            + self.open_candidate_region_elapsed
277            + self.upgrade_candidate_region_elapsed;
278        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
279            .with_label_values(&["total"])
280            .observe(total.as_secs_f64());
281
282        if !self.flush_leader_region_elapsed.is_zero() {
283            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
284                .with_label_values(&["flush_leader_region"])
285                .observe(self.flush_leader_region_elapsed.as_secs_f64());
286        }
287
288        if !self.downgrade_leader_region_elapsed.is_zero() {
289            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
290                .with_label_values(&["downgrade_leader_region"])
291                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
292        }
293
294        if !self.open_candidate_region_elapsed.is_zero() {
295            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
296                .with_label_values(&["open_candidate_region"])
297                .observe(self.open_candidate_region_elapsed.as_secs_f64());
298        }
299
300        if !self.upgrade_candidate_region_elapsed.is_zero() {
301            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
302                .with_label_values(&["upgrade_candidate_region"])
303                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
304        }
305    }
306}
307
308/// It's shared in each step and available in executing (including retrying).
309///
310/// It will be dropped if the procedure runner crashes.
311///
312/// The additional remote fetches are only required in the worst cases.
313#[derive(Debug, Clone, Default)]
314pub struct VolatileContext {
315    /// `opening_region_guard` will be set after the
316    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
317    ///
318    /// `opening_region_guards` should be consumed after
319    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
320    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
321    opening_region_guards: Vec<OperatingRegionGuard>,
322    /// The deadline of leader region lease.
323    leader_region_lease_deadline: Option<Instant>,
324    /// The datanode table values.
325    from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
326    /// The last_entry_ids of leader regions.
327    leader_region_last_entry_ids: HashMap<RegionId, u64>,
328    /// The last_entry_ids of leader metadata regions (Only used for metric engine).
329    leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
330    /// Metrics of region migration.
331    metrics: Metrics,
332}
333
334impl VolatileContext {
335    /// Sets the `leader_region_lease_deadline` if it does not exist.
336    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
337        if self.leader_region_lease_deadline.is_none() {
338            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
339        }
340    }
341
342    /// Resets the `leader_region_lease_deadline`.
343    pub fn reset_leader_region_lease_deadline(&mut self) {
344        self.leader_region_lease_deadline = None;
345    }
346
347    /// Sets the `leader_region_last_entry_id`.
348    pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
349        self.leader_region_last_entry_ids
350            .insert(region_id, last_entry_id);
351    }
352
353    /// Sets the `leader_region_metadata_last_entry_id`.
354    pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
355        self.leader_region_metadata_last_entry_ids
356            .insert(region_id, last_entry_id);
357    }
358}
359
360/// Used to generate new [Context].
361pub trait ContextFactory {
362    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
363}
364
365/// Default implementation.
366#[derive(Clone)]
367pub struct DefaultContextFactory {
368    volatile_ctx: VolatileContext,
369    in_memory_key: ResettableKvBackendRef,
370    table_metadata_manager: TableMetadataManagerRef,
371    opening_region_keeper: MemoryRegionKeeperRef,
372    region_failure_detector_controller: RegionFailureDetectorControllerRef,
373    mailbox: MailboxRef,
374    server_addr: String,
375    cache_invalidator: CacheInvalidatorRef,
376}
377
378impl DefaultContextFactory {
379    /// Returns an [`DefaultContextFactory`].
380    pub fn new(
381        in_memory_key: ResettableKvBackendRef,
382        table_metadata_manager: TableMetadataManagerRef,
383        opening_region_keeper: MemoryRegionKeeperRef,
384        region_failure_detector_controller: RegionFailureDetectorControllerRef,
385        mailbox: MailboxRef,
386        server_addr: String,
387        cache_invalidator: CacheInvalidatorRef,
388    ) -> Self {
389        Self {
390            volatile_ctx: VolatileContext::default(),
391            in_memory_key,
392            table_metadata_manager,
393            opening_region_keeper,
394            region_failure_detector_controller,
395            mailbox,
396            server_addr,
397            cache_invalidator,
398        }
399    }
400}
401
402impl ContextFactory for DefaultContextFactory {
403    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
404        Context {
405            persistent_ctx,
406            volatile_ctx: self.volatile_ctx,
407            in_memory: self.in_memory_key,
408            table_metadata_manager: self.table_metadata_manager,
409            opening_region_keeper: self.opening_region_keeper,
410            region_failure_detector_controller: self.region_failure_detector_controller,
411            mailbox: self.mailbox,
412            server_addr: self.server_addr,
413            cache_invalidator: self.cache_invalidator,
414        }
415    }
416}
417
418/// The context of procedure execution.
419pub struct Context {
420    persistent_ctx: PersistentContext,
421    volatile_ctx: VolatileContext,
422    in_memory: KvBackendRef,
423    table_metadata_manager: TableMetadataManagerRef,
424    opening_region_keeper: MemoryRegionKeeperRef,
425    region_failure_detector_controller: RegionFailureDetectorControllerRef,
426    mailbox: MailboxRef,
427    server_addr: String,
428    cache_invalidator: CacheInvalidatorRef,
429}
430
431impl Context {
432    /// Returns the next operation's timeout.
433    pub fn next_operation_timeout(&self) -> Option<Duration> {
434        self.persistent_ctx
435            .timeout
436            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
437    }
438
439    /// Updates operations elapsed.
440    pub fn update_operations_elapsed(&mut self, instant: Instant) {
441        self.volatile_ctx
442            .metrics
443            .update_operations_elapsed(instant.elapsed());
444    }
445
446    /// Updates the elapsed time of flushing leader region.
447    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
448        self.volatile_ctx
449            .metrics
450            .update_flush_leader_region_elapsed(instant.elapsed());
451    }
452
453    /// Updates the elapsed time of downgrading leader region.
454    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
455        self.volatile_ctx
456            .metrics
457            .update_downgrade_leader_region_elapsed(instant.elapsed());
458    }
459
460    /// Updates the elapsed time of open candidate region.
461    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
462        self.volatile_ctx
463            .metrics
464            .update_open_candidate_region_elapsed(instant.elapsed());
465    }
466
467    /// Updates the elapsed time of upgrade candidate region.
468    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
469        self.volatile_ctx
470            .metrics
471            .update_upgrade_candidate_region_elapsed(instant.elapsed());
472    }
473
474    /// Returns address of meta server.
475    pub fn server_addr(&self) -> &str {
476        &self.server_addr
477    }
478
479    /// Returns the table ids of the regions.
480    pub fn region_table_ids(&self) -> Vec<TableId> {
481        self.persistent_ctx
482            .region_ids
483            .iter()
484            .map(|region_id| region_id.table_id())
485            .collect::<HashSet<_>>()
486            .into_iter()
487            .collect()
488    }
489
490    /// Returns the `table_routes` of [VolatileContext] if any.
491    /// Otherwise, returns the value retrieved from remote.
492    ///
493    /// Retry:
494    /// - Failed to retrieve the metadata of table.
495    pub async fn get_table_route_values(
496        &self,
497    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
498        let table_ids = self.persistent_ctx.region_table_ids();
499        let table_routes = self
500            .table_metadata_manager
501            .table_route_manager()
502            .table_route_storage()
503            .batch_get_with_raw_bytes(&table_ids)
504            .await
505            .context(error::TableMetadataManagerSnafu)
506            .map_err(BoxedError::new)
507            .with_context(|_| error::RetryLaterWithSourceSnafu {
508                reason: format!("Failed to get table routes: {table_ids:?}"),
509            })?;
510        let table_routes = table_ids
511            .into_iter()
512            .zip(table_routes)
513            .filter_map(|(table_id, table_route)| {
514                table_route.map(|table_route| (table_id, table_route))
515            })
516            .collect::<HashMap<_, _>>();
517        Ok(table_routes)
518    }
519
520    /// Returns the `table_route` of [VolatileContext] if any.
521    /// Otherwise, returns the value retrieved from remote.
522    ///
523    /// Retry:
524    /// - Failed to retrieve the metadata of table.
525    pub async fn get_table_route_value(
526        &self,
527        table_id: TableId,
528    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
529        let table_route_value = self
530            .table_metadata_manager
531            .table_route_manager()
532            .table_route_storage()
533            .get_with_raw_bytes(table_id)
534            .await
535            .context(error::TableMetadataManagerSnafu)
536            .map_err(BoxedError::new)
537            .with_context(|_| error::RetryLaterWithSourceSnafu {
538                reason: format!("Failed to get table routes: {table_id:}"),
539            })?
540            .context(error::TableRouteNotFoundSnafu { table_id })?;
541        Ok(table_route_value)
542    }
543
544    /// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any.
545    /// Otherwise, returns the value retrieved from remote.
546    ///
547    /// Retry:
548    /// - Failed to retrieve the metadata of datanode table.
549    pub async fn get_from_peer_datanode_table_values(
550        &mut self,
551    ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
552        let from_peer_datanode_table_values =
553            &mut self.volatile_ctx.from_peer_datanode_table_values;
554        if from_peer_datanode_table_values.is_none() {
555            let table_ids = self.persistent_ctx.region_table_ids();
556            let datanode_table_keys = table_ids
557                .iter()
558                .map(|table_id| DatanodeTableKey {
559                    datanode_id: self.persistent_ctx.from_peer.id,
560                    table_id: *table_id,
561                })
562                .collect::<Vec<_>>();
563            let datanode_table_values = self
564                .table_metadata_manager
565                .datanode_table_manager()
566                .batch_get(&datanode_table_keys)
567                .await
568                .context(error::TableMetadataManagerSnafu)
569                .map_err(BoxedError::new)
570                .with_context(|_| error::RetryLaterWithSourceSnafu {
571                    reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
572                })?
573                .into_iter()
574                .map(|(k, v)| (k.table_id, v))
575                .collect();
576            *from_peer_datanode_table_values = Some(datanode_table_values);
577        }
578        Ok(from_peer_datanode_table_values.as_ref().unwrap())
579    }
580
581    /// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any.
582    /// Otherwise, returns the value retrieved from remote.
583    ///
584    /// Retry:
585    /// - Failed to retrieve the metadata of datanode table.
586    pub async fn get_from_peer_datanode_table_value(
587        &self,
588        table_id: TableId,
589    ) -> Result<DatanodeTableValue> {
590        let datanode_table_value = self
591            .table_metadata_manager
592            .datanode_table_manager()
593            .get(&DatanodeTableKey {
594                datanode_id: self.persistent_ctx.from_peer.id,
595                table_id,
596            })
597            .await
598            .context(error::TableMetadataManagerSnafu)
599            .map_err(BoxedError::new)
600            .with_context(|_| error::RetryLaterWithSourceSnafu {
601                reason: format!("Failed to get DatanodeTable: {table_id}"),
602            })?
603            .context(error::DatanodeTableNotFoundSnafu {
604                table_id,
605                datanode_id: self.persistent_ctx.from_peer.id,
606            })?;
607        Ok(datanode_table_value)
608    }
609
610    /// Notifies the RegionSupervisor to register failure detectors of failed region.
611    ///
612    /// The original failure detector was removed once the procedure was triggered.
613    /// Now, we need to register the failure detector for the failed region again.
614    pub async fn register_failure_detectors(&self) {
615        let datanode_id = self.persistent_ctx.from_peer.id;
616        let region_ids = &self.persistent_ctx.region_ids;
617        let detecting_regions = region_ids
618            .iter()
619            .map(|region_id| (datanode_id, *region_id))
620            .collect::<Vec<_>>();
621        self.region_failure_detector_controller
622            .register_failure_detectors(detecting_regions)
623            .await;
624        info!(
625            "Registered failure detectors after migration failures for datanode {}, regions {:?}",
626            datanode_id, region_ids
627        );
628    }
629
630    /// Notifies the RegionSupervisor to deregister failure detectors.
631    ///
632    /// The original failure detectors won't be removed once the procedure was triggered.
633    /// We need to deregister the failure detectors for the original region if the procedure is finished.
634    pub async fn deregister_failure_detectors(&self) {
635        let datanode_id = self.persistent_ctx.from_peer.id;
636        let region_ids = &self.persistent_ctx.region_ids;
637        let detecting_regions = region_ids
638            .iter()
639            .map(|region_id| (datanode_id, *region_id))
640            .collect::<Vec<_>>();
641
642        self.region_failure_detector_controller
643            .deregister_failure_detectors(detecting_regions)
644            .await;
645    }
646
647    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
648    ///
649    /// The candidate region may be created on the destination peer,
650    /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
651    pub async fn deregister_failure_detectors_for_candidate_region(&self) {
652        let to_peer_id = self.persistent_ctx.to_peer.id;
653        let region_ids = &self.persistent_ctx.region_ids;
654        let detecting_regions = region_ids
655            .iter()
656            .map(|region_id| (to_peer_id, *region_id))
657            .collect::<Vec<_>>();
658
659        self.region_failure_detector_controller
660            .deregister_failure_detectors(detecting_regions)
661            .await;
662    }
663
664    /// Fetches the replay checkpoints for the given topic region keys.
665    pub async fn get_replay_checkpoints(
666        &self,
667        topic_region_keys: Vec<TopicRegionKey<'_>>,
668    ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
669        let topic_region_values = self
670            .table_metadata_manager
671            .topic_region_manager()
672            .batch_get(topic_region_keys)
673            .await
674            .context(error::TableMetadataManagerSnafu)?;
675
676        let replay_checkpoints = topic_region_values
677            .into_iter()
678            .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
679            .collect::<HashMap<_, _>>();
680
681        Ok(replay_checkpoints)
682    }
683
684    /// Broadcasts the invalidate table cache message.
685    pub async fn invalidate_table_cache(&self) -> Result<()> {
686        let table_ids = self.region_table_ids();
687        let mut cache_idents = Vec::with_capacity(table_ids.len());
688        for table_id in &table_ids {
689            cache_idents.push(CacheIdent::TableId(*table_id));
690        }
691        // ignore the result
692        let ctx = common_meta::cache_invalidator::Context::default();
693        let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
694        Ok(())
695    }
696
697    /// Returns the [PersistentContext] of the procedure.
698    pub fn persistent_ctx(&self) -> PersistentContext {
699        self.persistent_ctx.clone()
700    }
701}
702
703#[async_trait::async_trait]
704#[typetag::serde(tag = "region_migration_state")]
705pub(crate) trait State: Sync + Send + Debug {
706    fn name(&self) -> &'static str {
707        let type_name = std::any::type_name::<Self>();
708        // short name
709        type_name.split("::").last().unwrap_or(type_name)
710    }
711
712    /// Yields the next [State] and [Status].
713    async fn next(
714        &mut self,
715        ctx: &mut Context,
716        procedure_ctx: &ProcedureContext,
717    ) -> Result<(Box<dyn State>, Status)>;
718
719    /// Returns as [Any](std::any::Any).
720    fn as_any(&self) -> &dyn Any;
721}
722
723/// Persistent data of [RegionMigrationProcedure].
724#[derive(Debug, Serialize, Deserialize)]
725pub struct RegionMigrationDataOwned {
726    persistent_ctx: PersistentContext,
727    state: Box<dyn State>,
728}
729
730/// Persistent data of [RegionMigrationProcedure].
731#[derive(Debug, Serialize)]
732pub struct RegionMigrationData<'a> {
733    persistent_ctx: &'a PersistentContext,
734    state: &'a dyn State,
735}
736
737pub(crate) struct RegionMigrationProcedure {
738    state: Box<dyn State>,
739    context: Context,
740    _guards: Vec<RegionMigrationProcedureGuard>,
741}
742
743impl RegionMigrationProcedure {
744    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
745
746    pub fn new(
747        persistent_context: PersistentContext,
748        context_factory: impl ContextFactory,
749        guards: Vec<RegionMigrationProcedureGuard>,
750    ) -> Self {
751        let state = Box::new(RegionMigrationStart {});
752        Self::new_inner(state, persistent_context, context_factory, guards)
753    }
754
755    fn new_inner(
756        state: Box<dyn State>,
757        persistent_context: PersistentContext,
758        context_factory: impl ContextFactory,
759        guards: Vec<RegionMigrationProcedureGuard>,
760    ) -> Self {
761        Self {
762            state,
763            context: context_factory.new_context(persistent_context),
764            _guards: guards,
765        }
766    }
767
768    fn from_json(
769        json: &str,
770        context_factory: impl ContextFactory,
771        tracker: RegionMigrationProcedureTracker,
772    ) -> ProcedureResult<Self> {
773        let RegionMigrationDataOwned {
774            persistent_ctx,
775            state,
776        } = serde_json::from_str(json).context(FromJsonSnafu)?;
777        let guards = persistent_ctx
778            .region_ids
779            .iter()
780            .flat_map(|region_id| {
781                tracker.insert_running_procedure(&RegionMigrationProcedureTask {
782                    region_id: *region_id,
783                    from_peer: persistent_ctx.from_peer.clone(),
784                    to_peer: persistent_ctx.to_peer.clone(),
785                    timeout: persistent_ctx.timeout,
786                    trigger_reason: persistent_ctx.trigger_reason,
787                })
788            })
789            .collect::<Vec<_>>();
790
791        let context = context_factory.new_context(persistent_ctx);
792
793        Ok(Self {
794            state,
795            context,
796            _guards: guards,
797        })
798    }
799
800    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
801        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
802            .with_label_values(&["rollback"])
803            .start_timer();
804        let ctx = &self.context;
805        let table_regions = ctx.persistent_ctx.table_regions();
806        for (table_id, regions) in table_regions {
807            let table_lock = TableLock::Write(table_id).into();
808            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
809            let table_route = ctx.get_table_route_value(table_id).await?;
810            let region_routes = table_route.region_routes().unwrap();
811            let downgraded = region_routes
812                .iter()
813                .filter(|route| regions.contains(&route.region.id))
814                .any(|route| route.is_leader_downgrading());
815            if downgraded {
816                info!(
817                    "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
818                );
819                let table_metadata_manager = &ctx.table_metadata_manager;
820                table_metadata_manager
821                    .update_leader_region_status(table_id, &table_route, |route| {
822                        if regions.contains(&route.region.id) {
823                            Some(None)
824                        } else {
825                            None
826                        }
827                    })
828                    .await
829                    .context(error::TableMetadataManagerSnafu)
830                    .map_err(BoxedError::new)
831                    .with_context(|_| error::RetryLaterWithSourceSnafu {
832                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
833                    })?;
834            }
835        }
836        self.context
837            .deregister_failure_detectors_for_candidate_region()
838            .await;
839        self.context.register_failure_detectors().await;
840
841        Ok(())
842    }
843}
844
845#[async_trait::async_trait]
846impl Procedure for RegionMigrationProcedure {
847    fn type_name(&self) -> &str {
848        Self::TYPE_NAME
849    }
850
851    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
852        self.rollback_inner(ctx)
853            .await
854            .map_err(ProcedureError::external)
855    }
856
857    fn rollback_supported(&self) -> bool {
858        true
859    }
860
861    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
862        let state = &mut self.state;
863
864        let name = state.name();
865        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
866            .with_label_values(&[name])
867            .start_timer();
868        match state.next(&mut self.context, ctx).await {
869            Ok((next, status)) => {
870                *state = next;
871                Ok(status)
872            }
873            Err(e) => {
874                if e.is_retryable() {
875                    METRIC_META_REGION_MIGRATION_ERROR
876                        .with_label_values(&[name, "retryable"])
877                        .inc();
878                    Err(ProcedureError::retry_later(e))
879                } else {
880                    // Consumes the opening region guard before deregistering the failure detectors.
881                    self.context.volatile_ctx.opening_region_guards.clear();
882                    self.context
883                        .deregister_failure_detectors_for_candidate_region()
884                        .await;
885                    error!(
886                        e;
887                        "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
888                        self.context.persistent_ctx.region_ids,
889                        self.context.persistent_ctx.from_peer,
890                        self.context.persistent_ctx.to_peer,
891                        self.context.volatile_ctx.metrics,
892                    );
893                    METRIC_META_REGION_MIGRATION_ERROR
894                        .with_label_values(&[name, "external"])
895                        .inc();
896                    Err(ProcedureError::external(e))
897                }
898            }
899        }
900    }
901
902    fn dump(&self) -> ProcedureResult<String> {
903        let data = RegionMigrationData {
904            state: self.state.as_ref(),
905            persistent_ctx: &self.context.persistent_ctx,
906        };
907        serde_json::to_string(&data).context(ToJsonSnafu)
908    }
909
910    fn lock_key(&self) -> LockKey {
911        LockKey::new(self.context.persistent_ctx.lock_key())
912    }
913
914    fn user_metadata(&self) -> Option<UserMetadata> {
915        Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use std::assert_matches::assert_matches;
922    use std::sync::Arc;
923
924    use common_meta::distributed_time_constants::REGION_LEASE_SECS;
925    use common_meta::instruction::Instruction;
926    use common_meta::key::test_utils::new_test_table_info;
927    use common_meta::rpc::router::{Region, RegionRoute};
928
929    use super::*;
930    use crate::handler::HeartbeatMailbox;
931    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
932    use crate::procedure::region_migration::test_util::*;
933    use crate::procedure::test_util::{
934        new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
935        new_upgrade_region_reply,
936    };
937    use crate::service::mailbox::Channel;
938
939    fn new_persistent_context() -> PersistentContext {
940        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
941    }
942
943    #[test]
944    fn test_lock_key() {
945        let persistent_context = new_persistent_context();
946        let expected_keys = persistent_context.lock_key();
947
948        let env = TestingEnv::new();
949        let context = env.context_factory();
950
951        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
952
953        let key = procedure.lock_key();
954        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
955
956        for key in expected_keys {
957            assert!(keys.contains(&key));
958        }
959    }
960
961    #[test]
962    fn test_data_serialization() {
963        let persistent_context = new_persistent_context();
964
965        let env = TestingEnv::new();
966        let context = env.context_factory();
967
968        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
969
970        let serialized = procedure.dump().unwrap();
971        let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
972        assert_eq!(expected, serialized);
973    }
974
975    #[test]
976    fn test_backward_compatibility() {
977        let persistent_ctx = PersistentContext {
978            #[allow(deprecated)]
979            catalog: Some("greptime".into()),
980            #[allow(deprecated)]
981            schema: Some("public".into()),
982            catalog_and_schema: vec![],
983            from_peer: Peer::empty(1),
984            to_peer: Peer::empty(2),
985            region_ids: vec![RegionId::new(1024, 1)],
986            timeout: Duration::from_secs(10),
987            trigger_reason: RegionMigrationTriggerReason::default(),
988        };
989        // NOTES: Changes it will break backward compatibility.
990        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
991        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
992
993        assert_eq!(persistent_ctx, deserialized);
994    }
995
996    #[derive(Debug, Serialize, Deserialize, Default)]
997    pub struct MockState;
998
999    #[async_trait::async_trait]
1000    #[typetag::serde]
1001    impl State for MockState {
1002        async fn next(
1003            &mut self,
1004            _ctx: &mut Context,
1005            _procedure_ctx: &ProcedureContext,
1006        ) -> Result<(Box<dyn State>, Status)> {
1007            Ok((Box::new(MockState), Status::done()))
1008        }
1009
1010        fn as_any(&self) -> &dyn Any {
1011            self
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn test_execution_after_deserialized() {
1017        let env = TestingEnv::new();
1018
1019        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
1020            let persistent_context = new_persistent_context();
1021            let context_factory = env.context_factory();
1022            let state = Box::<MockState>::default();
1023            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
1024        }
1025
1026        let ctx = TestingEnv::procedure_context();
1027        let mut procedure = new_mock_procedure(&env);
1028        let mut status = None;
1029        for _ in 0..3 {
1030            status = Some(procedure.execute(&ctx).await.unwrap());
1031        }
1032        assert!(status.unwrap().is_done());
1033
1034        let ctx = TestingEnv::procedure_context();
1035        let mut procedure = new_mock_procedure(&env);
1036
1037        status = Some(procedure.execute(&ctx).await.unwrap());
1038
1039        let serialized = procedure.dump().unwrap();
1040
1041        let context_factory = env.context_factory();
1042        let tracker = env.tracker();
1043        let mut procedure =
1044            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
1045                .unwrap();
1046        for region_id in &procedure.context.persistent_ctx.region_ids {
1047            assert!(tracker.contains(*region_id));
1048        }
1049
1050        for _ in 1..3 {
1051            status = Some(procedure.execute(&ctx).await.unwrap());
1052        }
1053        assert!(status.unwrap().is_done());
1054    }
1055
1056    #[tokio::test]
1057    async fn test_broadcast_invalidate_table_cache() {
1058        let mut env = TestingEnv::new();
1059        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1060        let ctx = env.context_factory().new_context(persistent_context);
1061        let mailbox_ctx = env.mailbox_context();
1062
1063        // No receivers.
1064        ctx.invalidate_table_cache().await.unwrap();
1065
1066        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1067
1068        mailbox_ctx
1069            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1070            .await;
1071
1072        ctx.invalidate_table_cache().await.unwrap();
1073
1074        let resp = rx.recv().await.unwrap().unwrap();
1075        let msg = resp.mailbox_message.unwrap();
1076
1077        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1078        assert_eq!(
1079            instruction,
1080            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1081        );
1082    }
1083
1084    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1085        vec![
1086            // MigrationStart
1087            Step::next(
1088                "Should be the open candidate region",
1089                None,
1090                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1091            ),
1092            // OpenCandidateRegion
1093            Step::next(
1094                "Should be the flush leader region",
1095                Some(mock_datanode_reply(
1096                    to_peer_id,
1097                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1098                )),
1099                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1100            ),
1101            // Flush Leader Region
1102            Step::next(
1103                "Should be the flush leader region",
1104                Some(mock_datanode_reply(
1105                    from_peer_id,
1106                    Arc::new(move |id| {
1107                        Ok(new_flush_region_reply_for_region(
1108                            id,
1109                            RegionId::new(1024, 1),
1110                            true,
1111                            None,
1112                        ))
1113                    }),
1114                )),
1115                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1116            ),
1117            // UpdateMetadata::Downgrade
1118            Step::next(
1119                "Should be the downgrade leader region",
1120                None,
1121                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1122            ),
1123            // Downgrade Candidate
1124            Step::next(
1125                "Should be the upgrade candidate region",
1126                Some(mock_datanode_reply(
1127                    from_peer_id,
1128                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1129                )),
1130                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1131            ),
1132            // Upgrade Candidate
1133            Step::next(
1134                "Should be the update metadata for upgrading",
1135                Some(mock_datanode_reply(
1136                    to_peer_id,
1137                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1138                )),
1139                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1140            ),
1141            // UpdateMetadata::Upgrade
1142            Step::next(
1143                "Should be the close downgraded region",
1144                None,
1145                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1146            ),
1147            // CloseDowngradedRegion
1148            Step::next(
1149                "Should be the region migration end",
1150                None,
1151                Assertion::simple(assert_region_migration_end, assert_done),
1152            ),
1153            // RegionMigrationEnd
1154            Step::next(
1155                "Should be the region migration end again",
1156                None,
1157                Assertion::simple(assert_region_migration_end, assert_done),
1158            ),
1159        ]
1160    }
1161
1162    #[tokio::test]
1163    async fn test_procedure_flow() {
1164        common_telemetry::init_default_ut_logging();
1165
1166        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1167        let state = Box::new(RegionMigrationStart);
1168
1169        // The table metadata.
1170        let from_peer_id = persistent_context.from_peer.id;
1171        let to_peer_id = persistent_context.to_peer.id;
1172        let from_peer = persistent_context.from_peer.clone();
1173        let to_peer = persistent_context.to_peer.clone();
1174        let region_id = persistent_context.region_ids[0];
1175        let table_info = new_test_table_info(1024, vec![1]).into();
1176        let region_routes = vec![RegionRoute {
1177            region: Region::new_test(region_id),
1178            leader_peer: Some(from_peer),
1179            follower_peers: vec![to_peer],
1180            ..Default::default()
1181        }];
1182
1183        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1184        suite.init_table_metadata(table_info, region_routes).await;
1185
1186        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1187        let timer = Instant::now();
1188
1189        // Run the table tests.
1190        let runner = ProcedureMigrationSuiteRunner::new(suite)
1191            .steps(steps)
1192            .run_once()
1193            .await;
1194
1195        // Ensure it didn't run into the slow path.
1196        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1197
1198        runner.suite.verify_table_metadata().await;
1199    }
1200
1201    #[tokio::test]
1202    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1203        common_telemetry::init_default_ut_logging();
1204
1205        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1206        let state = Box::new(RegionMigrationStart);
1207
1208        // The table metadata.
1209        let to_peer_id = persistent_context.to_peer.id;
1210        let from_peer = persistent_context.from_peer.clone();
1211        let region_id = persistent_context.region_ids[0];
1212        let table_info = new_test_table_info(1024, vec![1]).into();
1213        let region_routes = vec![RegionRoute {
1214            region: Region::new_test(region_id),
1215            leader_peer: Some(from_peer),
1216            follower_peers: vec![],
1217            ..Default::default()
1218        }];
1219
1220        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1221        suite.init_table_metadata(table_info, region_routes).await;
1222
1223        let steps = vec![
1224            // Migration Start
1225            Step::next(
1226                "Should be the open candidate region",
1227                None,
1228                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1229            ),
1230            // OpenCandidateRegion
1231            Step::next(
1232                "Should be throwing a non-retry error",
1233                Some(mock_datanode_reply(
1234                    to_peer_id,
1235                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1236                )),
1237                Assertion::error(|error| assert!(error.is_retryable())),
1238            ),
1239            // OpenCandidateRegion
1240            Step::next(
1241                "Should be throwing a non-retry error again",
1242                Some(mock_datanode_reply(
1243                    to_peer_id,
1244                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1245                )),
1246                Assertion::error(|error| assert!(error.is_retryable())),
1247            ),
1248        ];
1249
1250        let setup_to_latest_persisted_state = Step::setup(
1251            "Sets state to UpdateMetadata::Downgrade",
1252            merge_before_test_fn(vec![
1253                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1254                Arc::new(reset_volatile_ctx),
1255            ]),
1256        );
1257
1258        let steps = [
1259            steps.clone(),
1260            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1261            vec![setup_to_latest_persisted_state.clone()],
1262            steps.clone()[1..].to_vec(),
1263            vec![setup_to_latest_persisted_state],
1264            steps.clone()[1..].to_vec(),
1265        ]
1266        .concat();
1267
1268        // Run the table tests.
1269        let runner = ProcedureMigrationSuiteRunner::new(suite)
1270            .steps(steps.clone())
1271            .run_once()
1272            .await;
1273
1274        let table_routes_version = runner
1275            .env()
1276            .table_metadata_manager()
1277            .table_route_manager()
1278            .table_route_storage()
1279            .get(region_id.table_id())
1280            .await
1281            .unwrap()
1282            .unwrap()
1283            .version();
1284        // Should be unchanged.
1285        assert_eq!(table_routes_version.unwrap(), 0);
1286    }
1287
1288    #[tokio::test]
1289    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1290        common_telemetry::init_default_ut_logging();
1291
1292        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1293        let state = Box::new(RegionMigrationStart);
1294
1295        // The table metadata.
1296        let from_peer_id = persistent_context.from_peer.id;
1297        let to_peer_id = persistent_context.to_peer.id;
1298        let from_peer = persistent_context.from_peer.clone();
1299        let region_id = persistent_context.region_ids[0];
1300        let table_info = new_test_table_info(1024, vec![1]).into();
1301        let region_routes = vec![RegionRoute {
1302            region: Region::new_test(region_id),
1303            leader_peer: Some(from_peer),
1304            follower_peers: vec![],
1305            ..Default::default()
1306        }];
1307
1308        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1309        suite.init_table_metadata(table_info, region_routes).await;
1310
1311        let steps = vec![
1312            // MigrationStart
1313            Step::next(
1314                "Should be the open candidate region",
1315                None,
1316                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1317            ),
1318            // OpenCandidateRegion
1319            Step::next(
1320                "Should be the flush leader region",
1321                Some(mock_datanode_reply(
1322                    to_peer_id,
1323                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1324                )),
1325                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1326            ),
1327            // Flush Leader Region
1328            Step::next(
1329                "Should be the flush leader region",
1330                Some(mock_datanode_reply(
1331                    from_peer_id,
1332                    Arc::new(move |id| {
1333                        Ok(new_flush_region_reply_for_region(
1334                            id,
1335                            RegionId::new(1024, 1),
1336                            true,
1337                            None,
1338                        ))
1339                    }),
1340                )),
1341                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1342            ),
1343            // UpdateMetadata::Downgrade
1344            Step::next(
1345                "Should be the downgrade leader region",
1346                None,
1347                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1348            ),
1349            // Downgrade Candidate
1350            Step::next(
1351                "Should be the upgrade candidate region",
1352                Some(mock_datanode_reply(
1353                    from_peer_id,
1354                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1355                )),
1356                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1357            ),
1358            // Upgrade Candidate
1359            Step::next(
1360                "Should be the rollback metadata",
1361                Some(mock_datanode_reply(
1362                    to_peer_id,
1363                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1364                )),
1365                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1366            ),
1367            // UpdateMetadata::Rollback
1368            Step::next(
1369                "Should be the region migration abort",
1370                None,
1371                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1372            ),
1373            // RegionMigrationAbort
1374            Step::next(
1375                "Should throw an error",
1376                None,
1377                Assertion::error(|error| {
1378                    assert!(!error.is_retryable());
1379                    assert_matches!(error, error::Error::MigrationAbort { .. });
1380                }),
1381            ),
1382        ];
1383
1384        let setup_to_latest_persisted_state = Step::setup(
1385            "Sets state to OpenCandidateRegion",
1386            merge_before_test_fn(vec![
1387                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1388                Arc::new(reset_volatile_ctx),
1389            ]),
1390        );
1391
1392        let steps = [
1393            steps.clone(),
1394            vec![setup_to_latest_persisted_state.clone()],
1395            steps.clone()[1..].to_vec(),
1396            vec![setup_to_latest_persisted_state],
1397            steps.clone()[1..].to_vec(),
1398        ]
1399        .concat();
1400
1401        // Run the table tests.
1402        ProcedureMigrationSuiteRunner::new(suite)
1403            .steps(steps.clone())
1404            .run_once()
1405            .await;
1406    }
1407
1408    #[tokio::test]
1409    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1410        common_telemetry::init_default_ut_logging();
1411
1412        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1413        let state = Box::new(RegionMigrationStart);
1414
1415        // The table metadata.
1416        let to_peer_id = persistent_context.to_peer.id;
1417        let from_peer_id = persistent_context.from_peer.id;
1418        let from_peer = persistent_context.from_peer.clone();
1419        let region_id = persistent_context.region_ids[0];
1420        let table_info = new_test_table_info(1024, vec![1]).into();
1421        let region_routes = vec![RegionRoute {
1422            region: Region::new_test(region_id),
1423            leader_peer: Some(from_peer),
1424            follower_peers: vec![],
1425            ..Default::default()
1426        }];
1427
1428        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1429        suite.init_table_metadata(table_info, region_routes).await;
1430
1431        let steps = vec![
1432            // Migration Start
1433            Step::next(
1434                "Should be the open candidate region",
1435                None,
1436                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1437            ),
1438            // OpenCandidateRegion
1439            Step::next(
1440                "Should be throwing a retryable error",
1441                Some(mock_datanode_reply(
1442                    to_peer_id,
1443                    Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1444                )),
1445                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1446            ),
1447            // OpenCandidateRegion
1448            Step::next(
1449                "Should be the update metadata for downgrading",
1450                Some(mock_datanode_reply(
1451                    to_peer_id,
1452                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1453                )),
1454                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1455            ),
1456            // Flush Leader Region
1457            Step::next(
1458                "Should be the flush leader region",
1459                Some(mock_datanode_reply(
1460                    from_peer_id,
1461                    Arc::new(move |id| {
1462                        Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1463                    }),
1464                )),
1465                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1466            ),
1467            // UpdateMetadata::Downgrade
1468            Step::next(
1469                "Should be the downgrade leader region",
1470                None,
1471                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1472            ),
1473            // Downgrade Leader
1474            Step::next(
1475                "Should be the upgrade candidate region",
1476                Some(mock_datanode_reply(
1477                    from_peer_id,
1478                    merge_mailbox_messages(vec![
1479                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1480                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1481                    ]),
1482                )),
1483                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1484            ),
1485            // Upgrade Candidate
1486            Step::next(
1487                "Should be the update metadata for upgrading",
1488                Some(mock_datanode_reply(
1489                    to_peer_id,
1490                    merge_mailbox_messages(vec![
1491                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1492                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1493                    ]),
1494                )),
1495                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1496            ),
1497            // UpdateMetadata::Upgrade
1498            Step::next(
1499                "Should be the close downgraded region",
1500                None,
1501                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1502            ),
1503            // CloseDowngradedRegion
1504            Step::next(
1505                "Should be the region migration end",
1506                None,
1507                Assertion::simple(assert_region_migration_end, assert_done),
1508            ),
1509            // RegionMigrationEnd
1510            Step::next(
1511                "Should be the region migration end again",
1512                None,
1513                Assertion::simple(assert_region_migration_end, assert_done),
1514            ),
1515            // RegionMigrationStart
1516            Step::setup(
1517                "Sets state to RegionMigrationStart",
1518                merge_before_test_fn(vec![
1519                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1520                    Arc::new(reset_volatile_ctx),
1521                ]),
1522            ),
1523            // RegionMigrationEnd
1524            // Note: We can't run this test multiple times;
1525            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1526            Step::next(
1527                "Should be the region migration end(has been migrated)",
1528                None,
1529                Assertion::simple(assert_region_migration_end, assert_done),
1530            ),
1531        ];
1532
1533        let steps = [steps.clone()].concat();
1534        let timer = Instant::now();
1535
1536        // Run the table tests.
1537        let runner = ProcedureMigrationSuiteRunner::new(suite)
1538            .steps(steps.clone())
1539            .run_once()
1540            .await;
1541
1542        // Ensure it didn't run into the slow path.
1543        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1544        runner.suite.verify_table_metadata().await;
1545    }
1546}