Skip to main content

meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27pub(crate) mod utils;
28
29use std::any::Any;
30use std::collections::{HashMap, HashSet};
31use std::fmt::{Debug, Display};
32use std::sync::Arc;
33use std::time::Duration;
34
35use common_error::ext::BoxedError;
36use common_event_recorder::{Event, Eventable};
37use common_meta::cache_invalidator::CacheInvalidatorRef;
38use common_meta::ddl::RegionFailureDetectorControllerRef;
39use common_meta::instruction::CacheIdent;
40use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
41use common_meta::key::table_route::TableRouteValue;
42use common_meta::key::topic_name::TopicNameKey;
43use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
44use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
45use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
46use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
47use common_meta::peer::Peer;
48use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
49use common_procedure::error::{
50    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
51};
52use common_procedure::{
53    Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
54};
55use common_telemetry::{debug, error, info};
56use manager::RegionMigrationProcedureGuard;
57pub use manager::{
58    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
59    RegionMigrationTriggerReason,
60};
61use serde::{Deserialize, Deserializer, Serialize};
62use snafu::{OptionExt, ResultExt};
63use store_api::storage::{RegionId, TableId};
64use tokio::time::Instant;
65
66use self::migration_start::RegionMigrationStart;
67use crate::error::{self, Result};
68use crate::events::region_migration_event::RegionMigrationEvent;
69use crate::metrics::{
70    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
71    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
72};
73use crate::service::mailbox::MailboxRef;
74
75/// The default timeout for region migration.
76pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
77
78#[derive(Debug, Deserialize)]
79#[serde(untagged)]
80enum SingleOrMultiple<T> {
81    Single(T),
82    Multiple(Vec<T>),
83}
84
85fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
86where
87    D: Deserializer<'de>,
88    T: Deserialize<'de>,
89{
90    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
91    Ok(match helper {
92        SingleOrMultiple::Single(x) => vec![x],
93        SingleOrMultiple::Multiple(xs) => xs,
94    })
95}
96
97/// It's shared in each step and available even after recovering.
98///
99/// It will only be updated/stored after the Red node has succeeded.
100///
101/// **Notes: Stores with too large data in the context might incur replication overhead.**
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub struct PersistentContext {
104    /// The table catalog.
105    #[deprecated(note = "use `catalog_and_schema` instead")]
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub(crate) catalog: Option<String>,
108    /// The table schema.
109    #[deprecated(note = "use `catalog_and_schema` instead")]
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub(crate) schema: Option<String>,
112    /// The catalog and schema of the regions.
113    #[serde(default, skip_serializing_if = "Vec::is_empty")]
114    pub(crate) catalog_and_schema: Vec<(String, String)>,
115    /// The [Peer] of migration source.
116    pub(crate) from_peer: Peer,
117    /// The [Peer] of migration destination.
118    pub(crate) to_peer: Peer,
119    /// The [RegionId] of migration region.
120    #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
121    pub(crate) region_ids: Vec<RegionId>,
122    /// The timeout for downgrading leader region and upgrading candidate region operations.
123    #[serde(with = "humantime_serde", default = "default_timeout")]
124    pub(crate) timeout: Duration,
125    /// The trigger reason of region migration.
126    #[serde(default)]
127    pub(crate) trigger_reason: RegionMigrationTriggerReason,
128}
129
130impl PersistentContext {
131    pub fn new(
132        catalog_and_schema: Vec<(String, String)>,
133        from_peer: Peer,
134        to_peer: Peer,
135        region_ids: Vec<RegionId>,
136        timeout: Duration,
137        trigger_reason: RegionMigrationTriggerReason,
138    ) -> Self {
139        #[allow(deprecated)]
140        Self {
141            catalog: None,
142            schema: None,
143            catalog_and_schema,
144            from_peer,
145            to_peer,
146            region_ids,
147            timeout,
148            trigger_reason,
149        }
150    }
151}
152
153fn default_timeout() -> Duration {
154    Duration::from_secs(10)
155}
156
157impl PersistentContext {
158    pub fn lock_key(&self) -> Vec<StringKey> {
159        let mut lock_keys =
160            Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
161        #[allow(deprecated)]
162        if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
163            lock_keys.push(CatalogLock::Read(catalog).into());
164            lock_keys.push(SchemaLock::read(catalog, schema).into());
165        }
166        for (catalog, schema) in self.catalog_and_schema.iter() {
167            lock_keys.push(CatalogLock::Read(catalog).into());
168            lock_keys.push(SchemaLock::read(catalog, schema).into());
169        }
170
171        // Sort the region ids to ensure the same order of region ids.
172        let mut region_ids = self.region_ids.clone();
173        region_ids.sort_unstable();
174        for region_id in region_ids {
175            lock_keys.push(RegionLock::Write(region_id).into());
176        }
177        lock_keys
178    }
179
180    /// Returns the table ids of the regions.
181    ///
182    /// The return value is a set of table ids.
183    pub fn region_table_ids(&self) -> Vec<TableId> {
184        self.region_ids
185            .iter()
186            .map(|region_id| region_id.table_id())
187            .collect::<HashSet<_>>()
188            .into_iter()
189            .collect()
190    }
191
192    /// Returns the table regions map.
193    ///
194    /// The key is the table id, the value is the region ids of the table.
195    pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
196        let mut table_regions = HashMap::new();
197        for region_id in &self.region_ids {
198            table_regions
199                .entry(region_id.table_id())
200                .or_insert_with(Vec::new)
201                .push(*region_id);
202        }
203        table_regions
204    }
205}
206
207impl Eventable for PersistentContext {
208    fn to_event(&self) -> Option<Box<dyn Event>> {
209        Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
210    }
211}
212
213/// Metrics of region migration.
214#[derive(Debug, Clone, Default)]
215pub struct Metrics {
216    /// Elapsed time of downgrading region and upgrading region.
217    operations_elapsed: Duration,
218    /// Elapsed time of flushing leader region.
219    flush_leader_region_elapsed: Duration,
220    /// Elapsed time of downgrading leader region.
221    downgrade_leader_region_elapsed: Duration,
222    /// Elapsed time of open candidate region.
223    open_candidate_region_elapsed: Duration,
224    /// Elapsed time of upgrade candidate region.
225    upgrade_candidate_region_elapsed: Duration,
226}
227
228impl Display for Metrics {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        let total = self.flush_leader_region_elapsed
231            + self.downgrade_leader_region_elapsed
232            + self.open_candidate_region_elapsed
233            + self.upgrade_candidate_region_elapsed;
234        write!(
235            f,
236            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
237            total,
238            self.flush_leader_region_elapsed,
239            self.downgrade_leader_region_elapsed,
240            self.open_candidate_region_elapsed,
241            self.upgrade_candidate_region_elapsed
242        )
243    }
244}
245
246impl Metrics {
247    /// Updates the elapsed time of downgrading region and upgrading region.
248    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
249        self.operations_elapsed += elapsed;
250    }
251
252    /// Updates the elapsed time of flushing leader region.
253    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
254        self.flush_leader_region_elapsed += elapsed;
255    }
256
257    /// Updates the elapsed time of downgrading leader region.
258    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
259        self.downgrade_leader_region_elapsed += elapsed;
260    }
261
262    /// Updates the elapsed time of open candidate region.
263    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
264        self.open_candidate_region_elapsed += elapsed;
265    }
266
267    /// Updates the elapsed time of upgrade candidate region.
268    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
269        self.upgrade_candidate_region_elapsed += elapsed;
270    }
271}
272
273impl Drop for Metrics {
274    fn drop(&mut self) {
275        let total = self.flush_leader_region_elapsed
276            + self.downgrade_leader_region_elapsed
277            + self.open_candidate_region_elapsed
278            + self.upgrade_candidate_region_elapsed;
279        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
280            .with_label_values(&["total"])
281            .observe(total.as_secs_f64());
282
283        if !self.flush_leader_region_elapsed.is_zero() {
284            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
285                .with_label_values(&["flush_leader_region"])
286                .observe(self.flush_leader_region_elapsed.as_secs_f64());
287        }
288
289        if !self.downgrade_leader_region_elapsed.is_zero() {
290            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
291                .with_label_values(&["downgrade_leader_region"])
292                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
293        }
294
295        if !self.open_candidate_region_elapsed.is_zero() {
296            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
297                .with_label_values(&["open_candidate_region"])
298                .observe(self.open_candidate_region_elapsed.as_secs_f64());
299        }
300
301        if !self.upgrade_candidate_region_elapsed.is_zero() {
302            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
303                .with_label_values(&["upgrade_candidate_region"])
304                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
305        }
306    }
307}
308
309/// It's shared in each step and available in executing (including retrying).
310///
311/// It will be dropped if the procedure runner crashes.
312///
313/// The additional remote fetches are only required in the worst cases.
314#[derive(Debug, Clone, Default)]
315pub struct VolatileContext {
316    /// `opening_region_guard` will be set after the
317    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
318    ///
319    /// `opening_region_guards` should be consumed after
320    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
321    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
322    opening_region_guards: Vec<OperatingRegionGuard>,
323    /// The deadline of leader region lease.
324    leader_region_lease_deadline: Option<Instant>,
325    /// The datanode table values.
326    from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
327    /// The last_entry_ids of leader regions.
328    leader_region_last_entry_ids: HashMap<RegionId, u64>,
329    /// The last_entry_ids of leader metadata regions (Only used for metric engine).
330    leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
331    /// Metrics of region migration.
332    metrics: Metrics,
333}
334
335impl VolatileContext {
336    /// Sets the `leader_region_lease_deadline` if it does not exist.
337    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
338        if self.leader_region_lease_deadline.is_none() {
339            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
340        }
341    }
342
343    /// Resets the `leader_region_lease_deadline`.
344    pub fn reset_leader_region_lease_deadline(&mut self) {
345        self.leader_region_lease_deadline = None;
346    }
347
348    /// Sets the `leader_region_last_entry_id`.
349    pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
350        self.leader_region_last_entry_ids
351            .insert(region_id, last_entry_id);
352    }
353
354    /// Sets the `leader_region_metadata_last_entry_id`.
355    pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
356        self.leader_region_metadata_last_entry_ids
357            .insert(region_id, last_entry_id);
358    }
359}
360
361/// Used to generate new [Context].
362pub trait ContextFactory {
363    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
364}
365
366/// Default implementation.
367#[derive(Clone)]
368pub struct DefaultContextFactory {
369    volatile_ctx: VolatileContext,
370    in_memory_key: ResettableKvBackendRef,
371    table_metadata_manager: TableMetadataManagerRef,
372    opening_region_keeper: MemoryRegionKeeperRef,
373    region_failure_detector_controller: RegionFailureDetectorControllerRef,
374    mailbox: MailboxRef,
375    server_addr: String,
376    cache_invalidator: CacheInvalidatorRef,
377}
378
379impl DefaultContextFactory {
380    /// Returns an [`DefaultContextFactory`].
381    pub fn new(
382        in_memory_key: ResettableKvBackendRef,
383        table_metadata_manager: TableMetadataManagerRef,
384        opening_region_keeper: MemoryRegionKeeperRef,
385        region_failure_detector_controller: RegionFailureDetectorControllerRef,
386        mailbox: MailboxRef,
387        server_addr: String,
388        cache_invalidator: CacheInvalidatorRef,
389    ) -> Self {
390        Self {
391            volatile_ctx: VolatileContext::default(),
392            in_memory_key,
393            table_metadata_manager,
394            opening_region_keeper,
395            region_failure_detector_controller,
396            mailbox,
397            server_addr,
398            cache_invalidator,
399        }
400    }
401}
402
403impl ContextFactory for DefaultContextFactory {
404    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
405        Context {
406            persistent_ctx,
407            volatile_ctx: self.volatile_ctx,
408            in_memory: self.in_memory_key,
409            table_metadata_manager: self.table_metadata_manager,
410            opening_region_keeper: self.opening_region_keeper,
411            region_failure_detector_controller: self.region_failure_detector_controller,
412            mailbox: self.mailbox,
413            server_addr: self.server_addr,
414            cache_invalidator: self.cache_invalidator,
415        }
416    }
417}
418
419/// The context of procedure execution.
420pub struct Context {
421    persistent_ctx: PersistentContext,
422    volatile_ctx: VolatileContext,
423    in_memory: KvBackendRef,
424    table_metadata_manager: TableMetadataManagerRef,
425    opening_region_keeper: MemoryRegionKeeperRef,
426    region_failure_detector_controller: RegionFailureDetectorControllerRef,
427    mailbox: MailboxRef,
428    server_addr: String,
429    cache_invalidator: CacheInvalidatorRef,
430}
431
432impl Context {
433    /// Returns the next operation's timeout.
434    pub fn next_operation_timeout(&self) -> Option<Duration> {
435        self.persistent_ctx
436            .timeout
437            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
438    }
439
440    /// Updates operations elapsed.
441    pub fn update_operations_elapsed(&mut self, instant: Instant) {
442        self.volatile_ctx
443            .metrics
444            .update_operations_elapsed(instant.elapsed());
445    }
446
447    /// Updates the elapsed time of flushing leader region.
448    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
449        self.volatile_ctx
450            .metrics
451            .update_flush_leader_region_elapsed(instant.elapsed());
452    }
453
454    /// Updates the elapsed time of downgrading leader region.
455    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
456        self.volatile_ctx
457            .metrics
458            .update_downgrade_leader_region_elapsed(instant.elapsed());
459    }
460
461    /// Updates the elapsed time of open candidate region.
462    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
463        self.volatile_ctx
464            .metrics
465            .update_open_candidate_region_elapsed(instant.elapsed());
466    }
467
468    /// Updates the elapsed time of upgrade candidate region.
469    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
470        self.volatile_ctx
471            .metrics
472            .update_upgrade_candidate_region_elapsed(instant.elapsed());
473    }
474
475    /// Returns address of meta server.
476    pub fn server_addr(&self) -> &str {
477        &self.server_addr
478    }
479
480    /// Returns the table ids of the regions.
481    pub fn region_table_ids(&self) -> Vec<TableId> {
482        self.persistent_ctx
483            .region_ids
484            .iter()
485            .map(|region_id| region_id.table_id())
486            .collect::<HashSet<_>>()
487            .into_iter()
488            .collect()
489    }
490
491    /// Returns the `table_routes` of [VolatileContext] if any.
492    /// Otherwise, returns the value retrieved from remote.
493    ///
494    /// Retry:
495    /// - Failed to retrieve the metadata of table.
496    pub async fn get_table_route_values(
497        &self,
498    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
499        let table_ids = self.persistent_ctx.region_table_ids();
500        let table_routes = self
501            .table_metadata_manager
502            .table_route_manager()
503            .table_route_storage()
504            .batch_get_with_raw_bytes(&table_ids)
505            .await
506            .context(error::TableMetadataManagerSnafu)
507            .map_err(BoxedError::new)
508            .with_context(|_| error::RetryLaterWithSourceSnafu {
509                reason: format!("Failed to get table routes: {table_ids:?}"),
510            })?;
511        let table_routes = table_ids
512            .into_iter()
513            .zip(table_routes)
514            .filter_map(|(table_id, table_route)| {
515                table_route.map(|table_route| (table_id, table_route))
516            })
517            .collect::<HashMap<_, _>>();
518        Ok(table_routes)
519    }
520
521    /// Returns the `table_route` of [VolatileContext] if any.
522    /// Otherwise, returns the value retrieved from remote.
523    ///
524    /// Retry:
525    /// - Failed to retrieve the metadata of table.
526    pub async fn get_table_route_value(
527        &self,
528        table_id: TableId,
529    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
530        let table_route_value = self
531            .table_metadata_manager
532            .table_route_manager()
533            .table_route_storage()
534            .get_with_raw_bytes(table_id)
535            .await
536            .context(error::TableMetadataManagerSnafu)
537            .map_err(BoxedError::new)
538            .with_context(|_| error::RetryLaterWithSourceSnafu {
539                reason: format!("Failed to get table routes: {table_id:}"),
540            })?
541            .context(error::TableRouteNotFoundSnafu { table_id })?;
542        Ok(table_route_value)
543    }
544
545    /// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any.
546    /// Otherwise, returns the value retrieved from remote.
547    ///
548    /// Retry:
549    /// - Failed to retrieve the metadata of datanode table.
550    pub async fn get_from_peer_datanode_table_values(
551        &mut self,
552    ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
553        let from_peer_datanode_table_values =
554            &mut self.volatile_ctx.from_peer_datanode_table_values;
555        if from_peer_datanode_table_values.is_none() {
556            let table_ids = self.persistent_ctx.region_table_ids();
557            let datanode_table_keys = table_ids
558                .iter()
559                .map(|table_id| DatanodeTableKey {
560                    datanode_id: self.persistent_ctx.from_peer.id,
561                    table_id: *table_id,
562                })
563                .collect::<Vec<_>>();
564            let datanode_table_values = self
565                .table_metadata_manager
566                .datanode_table_manager()
567                .batch_get(&datanode_table_keys)
568                .await
569                .context(error::TableMetadataManagerSnafu)
570                .map_err(BoxedError::new)
571                .with_context(|_| error::RetryLaterWithSourceSnafu {
572                    reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
573                })?
574                .into_iter()
575                .map(|(k, v)| (k.table_id, v))
576                .collect();
577            *from_peer_datanode_table_values = Some(datanode_table_values);
578        }
579        Ok(from_peer_datanode_table_values.as_ref().unwrap())
580    }
581
582    /// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any.
583    /// Otherwise, returns the value retrieved from remote.
584    ///
585    /// Retry:
586    /// - Failed to retrieve the metadata of datanode table.
587    pub async fn get_from_peer_datanode_table_value(
588        &self,
589        table_id: TableId,
590    ) -> Result<DatanodeTableValue> {
591        let datanode_table_value = self
592            .table_metadata_manager
593            .datanode_table_manager()
594            .get(&DatanodeTableKey {
595                datanode_id: self.persistent_ctx.from_peer.id,
596                table_id,
597            })
598            .await
599            .context(error::TableMetadataManagerSnafu)
600            .map_err(BoxedError::new)
601            .with_context(|_| error::RetryLaterWithSourceSnafu {
602                reason: format!("Failed to get DatanodeTable: {table_id}"),
603            })?
604            .context(error::DatanodeTableNotFoundSnafu {
605                table_id,
606                datanode_id: self.persistent_ctx.from_peer.id,
607            })?;
608        Ok(datanode_table_value)
609    }
610
611    /// Notifies the RegionSupervisor to register failure detectors of failed region.
612    ///
613    /// The original failure detector was removed once the procedure was triggered.
614    /// Now, we need to register the failure detector for the failed region again.
615    pub async fn register_failure_detectors(&self) {
616        let datanode_id = self.persistent_ctx.from_peer.id;
617        let region_ids = &self.persistent_ctx.region_ids;
618        let detecting_regions = region_ids
619            .iter()
620            .map(|region_id| (datanode_id, *region_id))
621            .collect::<Vec<_>>();
622        self.region_failure_detector_controller
623            .register_failure_detectors(detecting_regions)
624            .await;
625        info!(
626            "Registered failure detectors after migration failures for datanode {}, regions {:?}",
627            datanode_id, region_ids
628        );
629    }
630
631    /// Notifies the RegionSupervisor to reset failure detectors of candidate regions.
632    pub async fn reset_failure_detectors_for_candidate_regions(&self) {
633        let datanode_id = self.persistent_ctx.to_peer.id;
634        let region_ids = &self.persistent_ctx.region_ids;
635        let detecting_regions = region_ids
636            .iter()
637            .map(|region_id| (datanode_id, *region_id))
638            .collect::<Vec<_>>();
639        self.region_failure_detector_controller
640            .reset_failure_detectors(detecting_regions)
641            .await;
642        info!(
643            "Reset failure detectors after migration success for datanode {}, regions {:?}",
644            datanode_id, region_ids
645        );
646    }
647
648    /// Notifies the RegionSupervisor to deregister failure detectors.
649    ///
650    /// The original failure detectors won't be removed once the procedure was triggered.
651    /// We need to deregister the failure detectors for the original region if the procedure is finished.
652    pub async fn deregister_failure_detectors(&self) {
653        let datanode_id = self.persistent_ctx.from_peer.id;
654        let region_ids = &self.persistent_ctx.region_ids;
655        let detecting_regions = region_ids
656            .iter()
657            .map(|region_id| (datanode_id, *region_id))
658            .collect::<Vec<_>>();
659
660        self.region_failure_detector_controller
661            .deregister_failure_detectors(detecting_regions)
662            .await;
663    }
664
665    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate regions on the destination peer.
666    ///
667    /// The candidate regions may be created on the destination peer,
668    /// so we need to deregister the failure detectors for the candidate regions if the procedure is aborted.
669    pub async fn deregister_failure_detectors_for_candidate_regions(&self) {
670        let to_peer_id = self.persistent_ctx.to_peer.id;
671        let region_ids = &self.persistent_ctx.region_ids;
672        let detecting_regions = region_ids
673            .iter()
674            .map(|region_id| (to_peer_id, *region_id))
675            .collect::<Vec<_>>();
676
677        self.region_failure_detector_controller
678            .deregister_failure_detectors(detecting_regions)
679            .await;
680    }
681
682    /// Fetches replay checkpoints and merges them with topic pruned entry ids.
683    pub async fn get_replay_checkpoints_with_topic_pruned_entry_ids(
684        &self,
685        region_topics: &[(RegionId, String, bool)],
686    ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
687        let topic_region_keys = region_topics
688            .iter()
689            .map(|(region_id, topic, _)| TopicRegionKey::new(*region_id, topic))
690            .collect::<Vec<_>>();
691        let topic_region_values = self
692            .table_metadata_manager
693            .topic_region_manager()
694            .batch_get(topic_region_keys)
695            .await
696            .context(error::TableMetadataManagerSnafu)?;
697
698        let topic_name_keys = region_topics
699            .iter()
700            .map(|(_, topic, _)| topic.as_str())
701            .collect::<HashSet<_>>()
702            .into_iter()
703            .map(TopicNameKey::new)
704            .collect::<Vec<_>>();
705        let topic_name_values = self
706            .table_metadata_manager
707            .topic_name_manager()
708            .batch_get(topic_name_keys)
709            .await
710            .context(error::TableMetadataManagerSnafu)?;
711        debug!(
712            "Fetched topic region values: {:?}, topic name values: {:?}",
713            topic_region_values, topic_name_values
714        );
715
716        let replay_checkpoints = region_topics
717            .iter()
718            .filter_map(|(region_id, topic, is_metric_engine)| {
719                let checkpoint = topic_region_values
720                    .get(region_id)
721                    .and_then(|value| value.checkpoint);
722                let pruned_entry_id = topic_name_values
723                    .get(topic)
724                    .map(|value| value.pruned_entry_id);
725
726                ReplayCheckpoint::merge_with_topic_pruned_entry_id(
727                    checkpoint,
728                    pruned_entry_id,
729                    *is_metric_engine,
730                )
731                .map(|checkpoint| (*region_id, checkpoint))
732            })
733            .collect::<HashMap<_, _>>();
734
735        Ok(replay_checkpoints)
736    }
737
738    /// Broadcasts the invalidate table cache message.
739    pub async fn invalidate_table_cache(&self) -> Result<()> {
740        let table_ids = self.region_table_ids();
741        let mut cache_idents = Vec::with_capacity(table_ids.len());
742        for table_id in &table_ids {
743            cache_idents.push(CacheIdent::TableId(*table_id));
744        }
745        // ignore the result
746        let ctx = common_meta::cache_invalidator::Context::default();
747        let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
748        Ok(())
749    }
750
751    /// Returns the [PersistentContext] of the procedure.
752    pub fn persistent_ctx(&self) -> PersistentContext {
753        self.persistent_ctx.clone()
754    }
755}
756
757#[async_trait::async_trait]
758#[typetag::serde(tag = "region_migration_state")]
759pub(crate) trait State: Sync + Send + Debug {
760    fn name(&self) -> &'static str {
761        let type_name = std::any::type_name::<Self>();
762        // short name
763        type_name.split("::").last().unwrap_or(type_name)
764    }
765
766    /// Yields the next [State] and [Status].
767    async fn next(
768        &mut self,
769        ctx: &mut Context,
770        procedure_ctx: &ProcedureContext,
771    ) -> Result<(Box<dyn State>, Status)>;
772
773    /// Returns as [Any](std::any::Any).
774    fn as_any(&self) -> &dyn Any;
775}
776
777/// Persistent data of [RegionMigrationProcedure].
778#[derive(Debug, Serialize, Deserialize)]
779pub struct RegionMigrationDataOwned {
780    persistent_ctx: PersistentContext,
781    state: Box<dyn State>,
782}
783
784/// Persistent data of [RegionMigrationProcedure].
785#[derive(Debug, Serialize)]
786pub struct RegionMigrationData<'a> {
787    persistent_ctx: &'a PersistentContext,
788    state: &'a dyn State,
789}
790
791pub(crate) struct RegionMigrationProcedure {
792    state: Box<dyn State>,
793    context: Context,
794    _guards: Vec<RegionMigrationProcedureGuard>,
795}
796
797impl RegionMigrationProcedure {
798    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
799
800    pub fn new(
801        persistent_context: PersistentContext,
802        context_factory: impl ContextFactory,
803        guards: Vec<RegionMigrationProcedureGuard>,
804    ) -> Self {
805        let state = Box::new(RegionMigrationStart {});
806        Self::new_inner(state, persistent_context, context_factory, guards)
807    }
808
809    fn new_inner(
810        state: Box<dyn State>,
811        persistent_context: PersistentContext,
812        context_factory: impl ContextFactory,
813        guards: Vec<RegionMigrationProcedureGuard>,
814    ) -> Self {
815        Self {
816            state,
817            context: context_factory.new_context(persistent_context),
818            _guards: guards,
819        }
820    }
821
822    fn from_json(
823        json: &str,
824        context_factory: impl ContextFactory,
825        tracker: RegionMigrationProcedureTracker,
826    ) -> ProcedureResult<Self> {
827        let RegionMigrationDataOwned {
828            persistent_ctx,
829            state,
830        } = serde_json::from_str(json).context(FromJsonSnafu)?;
831        let guards = persistent_ctx
832            .region_ids
833            .iter()
834            .flat_map(|region_id| {
835                tracker.insert_running_procedure(&RegionMigrationProcedureTask {
836                    region_id: *region_id,
837                    from_peer: persistent_ctx.from_peer.clone(),
838                    to_peer: persistent_ctx.to_peer.clone(),
839                    timeout: persistent_ctx.timeout,
840                    trigger_reason: persistent_ctx.trigger_reason,
841                })
842            })
843            .collect::<Vec<_>>();
844
845        let context = context_factory.new_context(persistent_ctx);
846
847        Ok(Self {
848            state,
849            context,
850            _guards: guards,
851        })
852    }
853
854    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
855        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
856            .with_label_values(&["rollback"])
857            .start_timer();
858        let ctx = &self.context;
859        let table_regions = ctx.persistent_ctx.table_regions();
860        for (table_id, regions) in table_regions {
861            let table_lock = TableLock::Write(table_id).into();
862            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
863            let table_route = ctx.get_table_route_value(table_id).await?;
864            let region_routes = table_route.region_routes().unwrap();
865            let downgraded = region_routes
866                .iter()
867                .filter(|route| regions.contains(&route.region.id))
868                .any(|route| route.is_leader_downgrading());
869            if downgraded {
870                info!(
871                    "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
872                );
873                let table_metadata_manager = &ctx.table_metadata_manager;
874                table_metadata_manager
875                    .update_leader_region_status(table_id, &table_route, |route| {
876                        if regions.contains(&route.region.id) {
877                            Some(None)
878                        } else {
879                            None
880                        }
881                    })
882                    .await
883                    .context(error::TableMetadataManagerSnafu)
884                    .map_err(BoxedError::new)
885                    .with_context(|_| error::RetryLaterWithSourceSnafu {
886                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
887                    })?;
888            }
889        }
890        self.context
891            .deregister_failure_detectors_for_candidate_regions()
892            .await;
893        self.context.register_failure_detectors().await;
894
895        Ok(())
896    }
897}
898
899#[async_trait::async_trait]
900impl Procedure for RegionMigrationProcedure {
901    fn type_name(&self) -> &str {
902        Self::TYPE_NAME
903    }
904
905    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
906        self.rollback_inner(ctx)
907            .await
908            .map_err(ProcedureError::external)
909    }
910
911    fn rollback_supported(&self) -> bool {
912        true
913    }
914
915    #[tracing::instrument(skip_all, fields(
916        state = %self.state.name(),
917        region_count = self.context.persistent_ctx.region_ids.len(),
918        from_peer = self.context.persistent_ctx.from_peer.id,
919        to_peer = self.context.persistent_ctx.to_peer.id,
920    ))]
921    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
922        let state = &mut self.state;
923
924        let name = state.name();
925        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
926            .with_label_values(&[name])
927            .start_timer();
928        match state.next(&mut self.context, ctx).await {
929            Ok((next, status)) => {
930                *state = next;
931                Ok(status)
932            }
933            Err(e) => {
934                if e.is_retryable() {
935                    METRIC_META_REGION_MIGRATION_ERROR
936                        .with_label_values(&[name, "retryable"])
937                        .inc();
938                    Err(ProcedureError::retry_later(e))
939                } else {
940                    // Consumes the opening region guard before deregistering the failure detectors.
941                    self.context.volatile_ctx.opening_region_guards.clear();
942                    self.context
943                        .deregister_failure_detectors_for_candidate_regions()
944                        .await;
945                    error!(
946                        e;
947                        "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
948                        self.context.persistent_ctx.region_ids,
949                        self.context.persistent_ctx.from_peer,
950                        self.context.persistent_ctx.to_peer,
951                        self.context.volatile_ctx.metrics,
952                    );
953                    METRIC_META_REGION_MIGRATION_ERROR
954                        .with_label_values(&[name, "external"])
955                        .inc();
956                    Err(ProcedureError::external(e))
957                }
958            }
959        }
960    }
961
962    fn dump(&self) -> ProcedureResult<String> {
963        let data = RegionMigrationData {
964            state: self.state.as_ref(),
965            persistent_ctx: &self.context.persistent_ctx,
966        };
967        serde_json::to_string(&data).context(ToJsonSnafu)
968    }
969
970    fn lock_key(&self) -> LockKey {
971        LockKey::new(self.context.persistent_ctx.lock_key())
972    }
973
974    fn user_metadata(&self) -> Option<UserMetadata> {
975        Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use std::assert_matches;
982    use std::sync::Arc;
983
984    use common_meta::distributed_time_constants::default_distributed_time_constants;
985    use common_meta::instruction::Instruction;
986    use common_meta::key::test_utils::new_test_table_info;
987    use common_meta::rpc::router::{Region, RegionRoute};
988
989    use super::*;
990    use crate::handler::HeartbeatMailbox;
991    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
992    use crate::procedure::region_migration::test_util::*;
993    use crate::procedure::test_util::{
994        new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
995        new_upgrade_region_reply,
996    };
997    use crate::service::mailbox::Channel;
998
999    fn new_persistent_context() -> PersistentContext {
1000        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
1001    }
1002
1003    #[test]
1004    fn test_lock_key() {
1005        let persistent_context = new_persistent_context();
1006        let expected_keys = persistent_context.lock_key();
1007
1008        let env = TestingEnv::new();
1009        let context = env.context_factory();
1010
1011        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
1012
1013        let key = procedure.lock_key();
1014        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
1015
1016        for key in expected_keys {
1017            assert!(keys.contains(&key));
1018        }
1019    }
1020
1021    #[test]
1022    fn test_data_serialization() {
1023        let persistent_context = new_persistent_context();
1024
1025        let env = TestingEnv::new();
1026        let context = env.context_factory();
1027
1028        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
1029
1030        let serialized = procedure.dump().unwrap();
1031        let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
1032        assert_eq!(expected, serialized);
1033    }
1034
1035    #[test]
1036    fn test_backward_compatibility() {
1037        let persistent_ctx = PersistentContext {
1038            #[allow(deprecated)]
1039            catalog: Some("greptime".into()),
1040            #[allow(deprecated)]
1041            schema: Some("public".into()),
1042            catalog_and_schema: vec![],
1043            from_peer: Peer::empty(1),
1044            to_peer: Peer::empty(2),
1045            region_ids: vec![RegionId::new(1024, 1)],
1046            timeout: Duration::from_secs(10),
1047            trigger_reason: RegionMigrationTriggerReason::default(),
1048        };
1049        // NOTES: Changes it will break backward compatibility.
1050        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
1051        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
1052
1053        assert_eq!(persistent_ctx, deserialized);
1054    }
1055
1056    #[derive(Debug, Serialize, Deserialize, Default)]
1057    pub struct MockState;
1058
1059    #[async_trait::async_trait]
1060    #[typetag::serde]
1061    impl State for MockState {
1062        async fn next(
1063            &mut self,
1064            _ctx: &mut Context,
1065            _procedure_ctx: &ProcedureContext,
1066        ) -> Result<(Box<dyn State>, Status)> {
1067            Ok((Box::new(MockState), Status::done()))
1068        }
1069
1070        fn as_any(&self) -> &dyn Any {
1071            self
1072        }
1073    }
1074
1075    #[tokio::test]
1076    async fn test_execution_after_deserialized() {
1077        let env = TestingEnv::new();
1078
1079        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
1080            let persistent_context = new_persistent_context();
1081            let context_factory = env.context_factory();
1082            let state = Box::<MockState>::default();
1083            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
1084        }
1085
1086        let ctx = TestingEnv::procedure_context();
1087        let mut procedure = new_mock_procedure(&env);
1088        let mut status = None;
1089        for _ in 0..3 {
1090            status = Some(procedure.execute(&ctx).await.unwrap());
1091        }
1092        assert!(status.unwrap().is_done());
1093
1094        let ctx = TestingEnv::procedure_context();
1095        let mut procedure = new_mock_procedure(&env);
1096
1097        status = Some(procedure.execute(&ctx).await.unwrap());
1098
1099        let serialized = procedure.dump().unwrap();
1100
1101        let context_factory = env.context_factory();
1102        let tracker = env.tracker();
1103        let mut procedure =
1104            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
1105                .unwrap();
1106        for region_id in &procedure.context.persistent_ctx.region_ids {
1107            assert!(tracker.contains(*region_id));
1108        }
1109
1110        for _ in 1..3 {
1111            status = Some(procedure.execute(&ctx).await.unwrap());
1112        }
1113        assert!(status.unwrap().is_done());
1114    }
1115
1116    #[tokio::test]
1117    async fn test_broadcast_invalidate_table_cache() {
1118        let mut env = TestingEnv::new();
1119        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1120        let ctx = env.context_factory().new_context(persistent_context);
1121        let mailbox_ctx = env.mailbox_context();
1122
1123        // No receivers.
1124        ctx.invalidate_table_cache().await.unwrap();
1125
1126        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1127
1128        mailbox_ctx
1129            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1130            .await;
1131
1132        ctx.invalidate_table_cache().await.unwrap();
1133
1134        let resp = rx.recv().await.unwrap().unwrap();
1135        let msg = resp.mailbox_message.unwrap();
1136
1137        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1138        assert_eq!(
1139            instruction,
1140            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1141        );
1142    }
1143
1144    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1145        vec![
1146            // MigrationStart
1147            Step::next(
1148                "Should be the open candidate region",
1149                None,
1150                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1151            ),
1152            // OpenCandidateRegion
1153            Step::next(
1154                "Should be the flush leader region",
1155                Some(mock_datanode_reply(
1156                    to_peer_id,
1157                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1158                )),
1159                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1160            ),
1161            // Flush Leader Region
1162            Step::next(
1163                "Should be the flush leader region",
1164                Some(mock_datanode_reply(
1165                    from_peer_id,
1166                    Arc::new(move |id| {
1167                        Ok(new_flush_region_reply_for_region(
1168                            id,
1169                            RegionId::new(1024, 1),
1170                            true,
1171                            None,
1172                        ))
1173                    }),
1174                )),
1175                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1176            ),
1177            // UpdateMetadata::Downgrade
1178            Step::next(
1179                "Should be the downgrade leader region",
1180                None,
1181                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1182            ),
1183            // Downgrade Candidate
1184            Step::next(
1185                "Should be the upgrade candidate region",
1186                Some(mock_datanode_reply(
1187                    from_peer_id,
1188                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1189                )),
1190                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1191            ),
1192            // Upgrade Candidate
1193            Step::next(
1194                "Should be the update metadata for upgrading",
1195                Some(mock_datanode_reply(
1196                    to_peer_id,
1197                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1198                )),
1199                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1200            ),
1201            // UpdateMetadata::Upgrade
1202            Step::next(
1203                "Should be the close downgraded region",
1204                None,
1205                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1206            ),
1207            // CloseDowngradedRegion
1208            Step::next(
1209                "Should be the region migration end",
1210                None,
1211                Assertion::simple(assert_region_migration_end, assert_done),
1212            ),
1213            // RegionMigrationEnd
1214            Step::next(
1215                "Should be the region migration end again",
1216                None,
1217                Assertion::simple(assert_region_migration_end, assert_done),
1218            ),
1219        ]
1220    }
1221
1222    #[tokio::test]
1223    async fn test_procedure_flow() {
1224        common_telemetry::init_default_ut_logging();
1225
1226        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1227        let state = Box::new(RegionMigrationStart);
1228
1229        // The table metadata.
1230        let from_peer_id = persistent_context.from_peer.id;
1231        let to_peer_id = persistent_context.to_peer.id;
1232        let from_peer = persistent_context.from_peer.clone();
1233        let to_peer = persistent_context.to_peer.clone();
1234        let region_id = persistent_context.region_ids[0];
1235        let table_info = new_test_table_info(1024);
1236        let region_routes = vec![RegionRoute {
1237            region: Region::new_test(region_id),
1238            leader_peer: Some(from_peer),
1239            follower_peers: vec![to_peer],
1240            ..Default::default()
1241        }];
1242
1243        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1244        suite.init_table_metadata(table_info, region_routes).await;
1245
1246        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1247        let timer = Instant::now();
1248
1249        // Run the table tests.
1250        let runner = ProcedureMigrationSuiteRunner::new(suite)
1251            .steps(steps)
1252            .run_once()
1253            .await;
1254
1255        let region_lease = default_distributed_time_constants().region_lease.as_secs();
1256
1257        // Ensure it didn't run into the slow path.
1258        assert!(timer.elapsed().as_secs() < region_lease / 2);
1259
1260        runner.suite.verify_table_metadata().await;
1261    }
1262
1263    #[tokio::test]
1264    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1265        common_telemetry::init_default_ut_logging();
1266
1267        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1268        let state = Box::new(RegionMigrationStart);
1269
1270        // The table metadata.
1271        let to_peer_id = persistent_context.to_peer.id;
1272        let from_peer = persistent_context.from_peer.clone();
1273        let region_id = persistent_context.region_ids[0];
1274        let table_info = new_test_table_info(1024);
1275        let region_routes = vec![RegionRoute {
1276            region: Region::new_test(region_id),
1277            leader_peer: Some(from_peer),
1278            follower_peers: vec![],
1279            ..Default::default()
1280        }];
1281
1282        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1283        suite.init_table_metadata(table_info, region_routes).await;
1284
1285        let steps = vec![
1286            // Migration Start
1287            Step::next(
1288                "Should be the open candidate region",
1289                None,
1290                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1291            ),
1292            // OpenCandidateRegion
1293            Step::next(
1294                "Should be throwing a non-retry error",
1295                Some(mock_datanode_reply(
1296                    to_peer_id,
1297                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1298                )),
1299                Assertion::error(|error| assert!(error.is_retryable())),
1300            ),
1301            // OpenCandidateRegion
1302            Step::next(
1303                "Should be throwing a non-retry error again",
1304                Some(mock_datanode_reply(
1305                    to_peer_id,
1306                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1307                )),
1308                Assertion::error(|error| assert!(error.is_retryable())),
1309            ),
1310        ];
1311
1312        let setup_to_latest_persisted_state = Step::setup(
1313            "Sets state to UpdateMetadata::Downgrade",
1314            merge_before_test_fn(vec![
1315                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1316                Arc::new(reset_volatile_ctx),
1317            ]),
1318        );
1319
1320        let steps = [
1321            steps.clone(),
1322            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1323            vec![setup_to_latest_persisted_state.clone()],
1324            steps.clone()[1..].to_vec(),
1325            vec![setup_to_latest_persisted_state],
1326            steps.clone()[1..].to_vec(),
1327        ]
1328        .concat();
1329
1330        // Run the table tests.
1331        let runner = ProcedureMigrationSuiteRunner::new(suite)
1332            .steps(steps.clone())
1333            .run_once()
1334            .await;
1335
1336        let table_routes_version = runner
1337            .env()
1338            .table_metadata_manager()
1339            .table_route_manager()
1340            .table_route_storage()
1341            .get(region_id.table_id())
1342            .await
1343            .unwrap()
1344            .unwrap()
1345            .version();
1346        // Should be unchanged.
1347        assert_eq!(table_routes_version.unwrap(), 0);
1348    }
1349
1350    #[tokio::test]
1351    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1352        common_telemetry::init_default_ut_logging();
1353
1354        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1355        let state = Box::new(RegionMigrationStart);
1356
1357        // The table metadata.
1358        let from_peer_id = persistent_context.from_peer.id;
1359        let to_peer_id = persistent_context.to_peer.id;
1360        let from_peer = persistent_context.from_peer.clone();
1361        let region_id = persistent_context.region_ids[0];
1362        let table_info = new_test_table_info(1024);
1363        let region_routes = vec![RegionRoute {
1364            region: Region::new_test(region_id),
1365            leader_peer: Some(from_peer),
1366            follower_peers: vec![],
1367            ..Default::default()
1368        }];
1369
1370        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1371        suite.init_table_metadata(table_info, region_routes).await;
1372
1373        let steps = vec![
1374            // MigrationStart
1375            Step::next(
1376                "Should be the open candidate region",
1377                None,
1378                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1379            ),
1380            // OpenCandidateRegion
1381            Step::next(
1382                "Should be the flush leader region",
1383                Some(mock_datanode_reply(
1384                    to_peer_id,
1385                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1386                )),
1387                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1388            ),
1389            // Flush Leader Region
1390            Step::next(
1391                "Should be the flush leader region",
1392                Some(mock_datanode_reply(
1393                    from_peer_id,
1394                    Arc::new(move |id| {
1395                        Ok(new_flush_region_reply_for_region(
1396                            id,
1397                            RegionId::new(1024, 1),
1398                            true,
1399                            None,
1400                        ))
1401                    }),
1402                )),
1403                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1404            ),
1405            // UpdateMetadata::Downgrade
1406            Step::next(
1407                "Should be the downgrade leader region",
1408                None,
1409                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1410            ),
1411            // Downgrade Candidate
1412            Step::next(
1413                "Should be the upgrade candidate region",
1414                Some(mock_datanode_reply(
1415                    from_peer_id,
1416                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1417                )),
1418                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1419            ),
1420            // Upgrade Candidate
1421            Step::next(
1422                "Should be the rollback metadata",
1423                Some(mock_datanode_reply(
1424                    to_peer_id,
1425                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1426                )),
1427                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1428            ),
1429            // UpdateMetadata::Rollback
1430            Step::next(
1431                "Should be the region migration abort",
1432                None,
1433                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1434            ),
1435            // RegionMigrationAbort
1436            Step::next(
1437                "Should throw an error",
1438                None,
1439                Assertion::error(|error| {
1440                    assert!(!error.is_retryable());
1441                    assert_matches!(error, error::Error::MigrationAbort { .. });
1442                }),
1443            ),
1444        ];
1445
1446        let setup_to_latest_persisted_state = Step::setup(
1447            "Sets state to OpenCandidateRegion",
1448            merge_before_test_fn(vec![
1449                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1450                Arc::new(reset_volatile_ctx),
1451            ]),
1452        );
1453
1454        let steps = [
1455            steps.clone(),
1456            vec![setup_to_latest_persisted_state.clone()],
1457            steps.clone()[1..].to_vec(),
1458            vec![setup_to_latest_persisted_state],
1459            steps.clone()[1..].to_vec(),
1460        ]
1461        .concat();
1462
1463        // Run the table tests.
1464        ProcedureMigrationSuiteRunner::new(suite)
1465            .steps(steps.clone())
1466            .run_once()
1467            .await;
1468    }
1469
1470    #[tokio::test]
1471    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1472        common_telemetry::init_default_ut_logging();
1473
1474        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1475        let state = Box::new(RegionMigrationStart);
1476
1477        // The table metadata.
1478        let to_peer_id = persistent_context.to_peer.id;
1479        let from_peer_id = persistent_context.from_peer.id;
1480        let from_peer = persistent_context.from_peer.clone();
1481        let region_id = persistent_context.region_ids[0];
1482        let table_info = new_test_table_info(1024);
1483        let region_routes = vec![RegionRoute {
1484            region: Region::new_test(region_id),
1485            leader_peer: Some(from_peer),
1486            follower_peers: vec![],
1487            ..Default::default()
1488        }];
1489
1490        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1491        suite.init_table_metadata(table_info, region_routes).await;
1492
1493        let steps = vec![
1494            // Migration Start
1495            Step::next(
1496                "Should be the open candidate region",
1497                None,
1498                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1499            ),
1500            // OpenCandidateRegion
1501            Step::next(
1502                "Should be throwing a retryable error",
1503                Some(mock_datanode_reply(
1504                    to_peer_id,
1505                    Arc::new(|id| {
1506                        Ok(new_open_region_reply(
1507                            id,
1508                            false,
1509                            Some("mock retryable open region error".to_string()),
1510                        ))
1511                    }),
1512                )),
1513                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1514            ),
1515            // OpenCandidateRegion
1516            Step::next(
1517                "Should be the update metadata for downgrading",
1518                Some(mock_datanode_reply(
1519                    to_peer_id,
1520                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1521                )),
1522                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1523            ),
1524            // Flush Leader Region
1525            Step::next(
1526                "Should be the flush leader region",
1527                Some(mock_datanode_reply(
1528                    from_peer_id,
1529                    Arc::new(move |id| {
1530                        Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1531                    }),
1532                )),
1533                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1534            ),
1535            // UpdateMetadata::Downgrade
1536            Step::next(
1537                "Should be the downgrade leader region",
1538                None,
1539                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1540            ),
1541            // Downgrade Leader
1542            Step::next(
1543                "Should be the upgrade candidate region",
1544                Some(mock_datanode_reply(
1545                    from_peer_id,
1546                    merge_mailbox_messages(vec![
1547                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1548                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1549                    ]),
1550                )),
1551                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1552            ),
1553            // Upgrade Candidate
1554            Step::next(
1555                "Should be the update metadata for upgrading",
1556                Some(mock_datanode_reply(
1557                    to_peer_id,
1558                    merge_mailbox_messages(vec![
1559                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1560                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1561                    ]),
1562                )),
1563                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1564            ),
1565            // UpdateMetadata::Upgrade
1566            Step::next(
1567                "Should be the close downgraded region",
1568                None,
1569                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1570            ),
1571            // CloseDowngradedRegion
1572            Step::next(
1573                "Should be the region migration end",
1574                None,
1575                Assertion::simple(assert_region_migration_end, assert_done),
1576            ),
1577            // RegionMigrationEnd
1578            Step::next(
1579                "Should be the region migration end again",
1580                None,
1581                Assertion::simple(assert_region_migration_end, assert_done),
1582            ),
1583            // RegionMigrationStart
1584            Step::setup(
1585                "Sets state to RegionMigrationStart",
1586                merge_before_test_fn(vec![
1587                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1588                    Arc::new(reset_volatile_ctx),
1589                ]),
1590            ),
1591            // RegionMigrationEnd
1592            // Note: We can't run this test multiple times;
1593            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1594            Step::next(
1595                "Should be the region migration end(has been migrated)",
1596                None,
1597                Assertion::simple(assert_region_migration_end, assert_done),
1598            ),
1599        ];
1600
1601        let steps = [steps.clone()].concat();
1602        let timer = Instant::now();
1603
1604        // Run the table tests.
1605        let runner = ProcedureMigrationSuiteRunner::new(suite)
1606            .steps(steps.clone())
1607            .run_once()
1608            .await;
1609
1610        let region_lease = default_distributed_time_constants().region_lease.as_secs();
1611        // Ensure it didn't run into the slow path.
1612        assert!(timer.elapsed().as_secs() < region_lease);
1613        runner.suite.verify_table_metadata().await;
1614    }
1615}