meta_srv/procedure/
region_migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::collections::{HashMap, HashSet};
30use std::fmt::{Debug, Display};
31use std::sync::Arc;
32use std::time::Duration;
33
34use common_error::ext::BoxedError;
35use common_event_recorder::{Event, Eventable};
36use common_meta::cache_invalidator::CacheInvalidatorRef;
37use common_meta::ddl::RegionFailureDetectorControllerRef;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
42use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
43use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
44use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
45use common_meta::peer::Peer;
46use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
47use common_procedure::error::{
48    Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
49};
50use common_procedure::{
51    Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info};
54use manager::RegionMigrationProcedureGuard;
55pub use manager::{
56    RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
57    RegionMigrationTriggerReason,
58};
59use serde::{Deserialize, Deserializer, Serialize};
60use snafu::{OptionExt, ResultExt};
61use store_api::storage::{RegionId, TableId};
62use tokio::time::Instant;
63
64use self::migration_start::RegionMigrationStart;
65use crate::error::{self, Result};
66use crate::events::region_migration_event::RegionMigrationEvent;
67use crate::metrics::{
68    METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
69    METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
70};
71use crate::service::mailbox::MailboxRef;
72
73/// The default timeout for region migration.
74pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
75
76#[derive(Debug, Deserialize)]
77#[serde(untagged)]
78enum SingleOrMultiple<T> {
79    Single(T),
80    Multiple(Vec<T>),
81}
82
83fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
84where
85    D: Deserializer<'de>,
86    T: Deserialize<'de>,
87{
88    let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
89    Ok(match helper {
90        SingleOrMultiple::Single(x) => vec![x],
91        SingleOrMultiple::Multiple(xs) => xs,
92    })
93}
94
95/// It's shared in each step and available even after recovering.
96///
97/// It will only be updated/stored after the Red node has succeeded.
98///
99/// **Notes: Stores with too large data in the context might incur replication overhead.**
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct PersistentContext {
102    /// The table catalog.
103    pub(crate) catalog: String,
104    /// The table schema.
105    pub(crate) schema: String,
106    /// The [Peer] of migration source.
107    pub(crate) from_peer: Peer,
108    /// The [Peer] of migration destination.
109    pub(crate) to_peer: Peer,
110    /// The [RegionId] of migration region.
111    #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
112    pub(crate) region_ids: Vec<RegionId>,
113    /// The timeout for downgrading leader region and upgrading candidate region operations.
114    #[serde(with = "humantime_serde", default = "default_timeout")]
115    pub(crate) timeout: Duration,
116    /// The trigger reason of region migration.
117    #[serde(default)]
118    pub(crate) trigger_reason: RegionMigrationTriggerReason,
119}
120
121fn default_timeout() -> Duration {
122    Duration::from_secs(10)
123}
124
125impl PersistentContext {
126    pub fn lock_key(&self) -> Vec<StringKey> {
127        let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2);
128        lock_keys.push(CatalogLock::Read(&self.catalog).into());
129        lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into());
130        // Sort the region ids to ensure the same order of region ids.
131        let mut region_ids = self.region_ids.clone();
132        region_ids.sort_unstable();
133        for region_id in region_ids {
134            lock_keys.push(RegionLock::Write(region_id).into());
135        }
136        lock_keys
137    }
138
139    /// Returns the table ids of the regions.
140    ///
141    /// The return value is a set of table ids.
142    pub fn region_table_ids(&self) -> Vec<TableId> {
143        self.region_ids
144            .iter()
145            .map(|region_id| region_id.table_id())
146            .collect::<HashSet<_>>()
147            .into_iter()
148            .collect()
149    }
150
151    /// Returns the table regions map.
152    ///
153    /// The key is the table id, the value is the region ids of the table.
154    pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
155        let mut table_regions = HashMap::new();
156        for region_id in &self.region_ids {
157            table_regions
158                .entry(region_id.table_id())
159                .or_insert_with(Vec::new)
160                .push(*region_id);
161        }
162        table_regions
163    }
164}
165
166impl Eventable for PersistentContext {
167    fn to_event(&self) -> Option<Box<dyn Event>> {
168        Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
169    }
170}
171
172/// Metrics of region migration.
173#[derive(Debug, Clone, Default)]
174pub struct Metrics {
175    /// Elapsed time of downgrading region and upgrading region.
176    operations_elapsed: Duration,
177    /// Elapsed time of flushing leader region.
178    flush_leader_region_elapsed: Duration,
179    /// Elapsed time of downgrading leader region.
180    downgrade_leader_region_elapsed: Duration,
181    /// Elapsed time of open candidate region.
182    open_candidate_region_elapsed: Duration,
183    /// Elapsed time of upgrade candidate region.
184    upgrade_candidate_region_elapsed: Duration,
185}
186
187impl Display for Metrics {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        let total = self.flush_leader_region_elapsed
190            + self.downgrade_leader_region_elapsed
191            + self.open_candidate_region_elapsed
192            + self.upgrade_candidate_region_elapsed;
193        write!(
194            f,
195            "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
196            total,
197            self.flush_leader_region_elapsed,
198            self.downgrade_leader_region_elapsed,
199            self.open_candidate_region_elapsed,
200            self.upgrade_candidate_region_elapsed
201        )
202    }
203}
204
205impl Metrics {
206    /// Updates the elapsed time of downgrading region and upgrading region.
207    pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
208        self.operations_elapsed += elapsed;
209    }
210
211    /// Updates the elapsed time of flushing leader region.
212    pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
213        self.flush_leader_region_elapsed += elapsed;
214    }
215
216    /// Updates the elapsed time of downgrading leader region.
217    pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
218        self.downgrade_leader_region_elapsed += elapsed;
219    }
220
221    /// Updates the elapsed time of open candidate region.
222    pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
223        self.open_candidate_region_elapsed += elapsed;
224    }
225
226    /// Updates the elapsed time of upgrade candidate region.
227    pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
228        self.upgrade_candidate_region_elapsed += elapsed;
229    }
230}
231
232impl Drop for Metrics {
233    fn drop(&mut self) {
234        let total = self.flush_leader_region_elapsed
235            + self.downgrade_leader_region_elapsed
236            + self.open_candidate_region_elapsed
237            + self.upgrade_candidate_region_elapsed;
238        METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
239            .with_label_values(&["total"])
240            .observe(total.as_secs_f64());
241
242        if !self.flush_leader_region_elapsed.is_zero() {
243            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
244                .with_label_values(&["flush_leader_region"])
245                .observe(self.flush_leader_region_elapsed.as_secs_f64());
246        }
247
248        if !self.downgrade_leader_region_elapsed.is_zero() {
249            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
250                .with_label_values(&["downgrade_leader_region"])
251                .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
252        }
253
254        if !self.open_candidate_region_elapsed.is_zero() {
255            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
256                .with_label_values(&["open_candidate_region"])
257                .observe(self.open_candidate_region_elapsed.as_secs_f64());
258        }
259
260        if !self.upgrade_candidate_region_elapsed.is_zero() {
261            METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
262                .with_label_values(&["upgrade_candidate_region"])
263                .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
264        }
265    }
266}
267
268/// It's shared in each step and available in executing (including retrying).
269///
270/// It will be dropped if the procedure runner crashes.
271///
272/// The additional remote fetches are only required in the worst cases.
273#[derive(Debug, Clone, Default)]
274pub struct VolatileContext {
275    /// `opening_region_guard` will be set after the
276    /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
277    ///
278    /// `opening_region_guards` should be consumed after
279    /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
280    /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
281    opening_region_guards: Vec<OperatingRegionGuard>,
282    /// The deadline of leader region lease.
283    leader_region_lease_deadline: Option<Instant>,
284    /// The datanode table values.
285    from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
286    /// The last_entry_ids of leader regions.
287    leader_region_last_entry_ids: HashMap<RegionId, u64>,
288    /// The last_entry_ids of leader metadata regions (Only used for metric engine).
289    leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
290    /// Metrics of region migration.
291    metrics: Metrics,
292}
293
294impl VolatileContext {
295    /// Sets the `leader_region_lease_deadline` if it does not exist.
296    pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
297        if self.leader_region_lease_deadline.is_none() {
298            self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
299        }
300    }
301
302    /// Resets the `leader_region_lease_deadline`.
303    pub fn reset_leader_region_lease_deadline(&mut self) {
304        self.leader_region_lease_deadline = None;
305    }
306
307    /// Sets the `leader_region_last_entry_id`.
308    pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
309        self.leader_region_last_entry_ids
310            .insert(region_id, last_entry_id);
311    }
312
313    /// Sets the `leader_region_metadata_last_entry_id`.
314    pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
315        self.leader_region_metadata_last_entry_ids
316            .insert(region_id, last_entry_id);
317    }
318}
319
320/// Used to generate new [Context].
321pub trait ContextFactory {
322    fn new_context(self, persistent_ctx: PersistentContext) -> Context;
323}
324
325/// Default implementation.
326#[derive(Clone)]
327pub struct DefaultContextFactory {
328    volatile_ctx: VolatileContext,
329    in_memory_key: ResettableKvBackendRef,
330    table_metadata_manager: TableMetadataManagerRef,
331    opening_region_keeper: MemoryRegionKeeperRef,
332    region_failure_detector_controller: RegionFailureDetectorControllerRef,
333    mailbox: MailboxRef,
334    server_addr: String,
335    cache_invalidator: CacheInvalidatorRef,
336}
337
338impl DefaultContextFactory {
339    /// Returns an [`DefaultContextFactory`].
340    pub fn new(
341        in_memory_key: ResettableKvBackendRef,
342        table_metadata_manager: TableMetadataManagerRef,
343        opening_region_keeper: MemoryRegionKeeperRef,
344        region_failure_detector_controller: RegionFailureDetectorControllerRef,
345        mailbox: MailboxRef,
346        server_addr: String,
347        cache_invalidator: CacheInvalidatorRef,
348    ) -> Self {
349        Self {
350            volatile_ctx: VolatileContext::default(),
351            in_memory_key,
352            table_metadata_manager,
353            opening_region_keeper,
354            region_failure_detector_controller,
355            mailbox,
356            server_addr,
357            cache_invalidator,
358        }
359    }
360}
361
362impl ContextFactory for DefaultContextFactory {
363    fn new_context(self, persistent_ctx: PersistentContext) -> Context {
364        Context {
365            persistent_ctx,
366            volatile_ctx: self.volatile_ctx,
367            in_memory: self.in_memory_key,
368            table_metadata_manager: self.table_metadata_manager,
369            opening_region_keeper: self.opening_region_keeper,
370            region_failure_detector_controller: self.region_failure_detector_controller,
371            mailbox: self.mailbox,
372            server_addr: self.server_addr,
373            cache_invalidator: self.cache_invalidator,
374        }
375    }
376}
377
378/// The context of procedure execution.
379pub struct Context {
380    persistent_ctx: PersistentContext,
381    volatile_ctx: VolatileContext,
382    in_memory: KvBackendRef,
383    table_metadata_manager: TableMetadataManagerRef,
384    opening_region_keeper: MemoryRegionKeeperRef,
385    region_failure_detector_controller: RegionFailureDetectorControllerRef,
386    mailbox: MailboxRef,
387    server_addr: String,
388    cache_invalidator: CacheInvalidatorRef,
389}
390
391impl Context {
392    /// Returns the next operation's timeout.
393    pub fn next_operation_timeout(&self) -> Option<Duration> {
394        self.persistent_ctx
395            .timeout
396            .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
397    }
398
399    /// Updates operations elapsed.
400    pub fn update_operations_elapsed(&mut self, instant: Instant) {
401        self.volatile_ctx
402            .metrics
403            .update_operations_elapsed(instant.elapsed());
404    }
405
406    /// Updates the elapsed time of flushing leader region.
407    pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
408        self.volatile_ctx
409            .metrics
410            .update_flush_leader_region_elapsed(instant.elapsed());
411    }
412
413    /// Updates the elapsed time of downgrading leader region.
414    pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
415        self.volatile_ctx
416            .metrics
417            .update_downgrade_leader_region_elapsed(instant.elapsed());
418    }
419
420    /// Updates the elapsed time of open candidate region.
421    pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
422        self.volatile_ctx
423            .metrics
424            .update_open_candidate_region_elapsed(instant.elapsed());
425    }
426
427    /// Updates the elapsed time of upgrade candidate region.
428    pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
429        self.volatile_ctx
430            .metrics
431            .update_upgrade_candidate_region_elapsed(instant.elapsed());
432    }
433
434    /// Returns address of meta server.
435    pub fn server_addr(&self) -> &str {
436        &self.server_addr
437    }
438
439    /// Returns the table ids of the regions.
440    pub fn region_table_ids(&self) -> Vec<TableId> {
441        self.persistent_ctx
442            .region_ids
443            .iter()
444            .map(|region_id| region_id.table_id())
445            .collect::<HashSet<_>>()
446            .into_iter()
447            .collect()
448    }
449
450    /// Returns the `table_routes` of [VolatileContext] if any.
451    /// Otherwise, returns the value retrieved from remote.
452    ///
453    /// Retry:
454    /// - Failed to retrieve the metadata of table.
455    pub async fn get_table_route_values(
456        &self,
457    ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
458        let table_ids = self.persistent_ctx.region_table_ids();
459        let table_routes = self
460            .table_metadata_manager
461            .table_route_manager()
462            .table_route_storage()
463            .batch_get_with_raw_bytes(&table_ids)
464            .await
465            .context(error::TableMetadataManagerSnafu)
466            .map_err(BoxedError::new)
467            .with_context(|_| error::RetryLaterWithSourceSnafu {
468                reason: format!("Failed to get table routes: {table_ids:?}"),
469            })?;
470        let table_routes = table_ids
471            .into_iter()
472            .zip(table_routes)
473            .filter_map(|(table_id, table_route)| {
474                table_route.map(|table_route| (table_id, table_route))
475            })
476            .collect::<HashMap<_, _>>();
477        Ok(table_routes)
478    }
479
480    /// Returns the `table_route` of [VolatileContext] if any.
481    /// Otherwise, returns the value retrieved from remote.
482    ///
483    /// Retry:
484    /// - Failed to retrieve the metadata of table.
485    pub async fn get_table_route_value(
486        &self,
487        table_id: TableId,
488    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
489        let table_route_value = self
490            .table_metadata_manager
491            .table_route_manager()
492            .table_route_storage()
493            .get_with_raw_bytes(table_id)
494            .await
495            .context(error::TableMetadataManagerSnafu)
496            .map_err(BoxedError::new)
497            .with_context(|_| error::RetryLaterWithSourceSnafu {
498                reason: format!("Failed to get table routes: {table_id:}"),
499            })?
500            .context(error::TableRouteNotFoundSnafu { table_id })?;
501        Ok(table_route_value)
502    }
503
504    /// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any.
505    /// Otherwise, returns the value retrieved from remote.
506    ///
507    /// Retry:
508    /// - Failed to retrieve the metadata of datanode table.
509    pub async fn get_from_peer_datanode_table_values(
510        &mut self,
511    ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
512        let from_peer_datanode_table_values =
513            &mut self.volatile_ctx.from_peer_datanode_table_values;
514        if from_peer_datanode_table_values.is_none() {
515            let table_ids = self.persistent_ctx.region_table_ids();
516            let datanode_table_keys = table_ids
517                .iter()
518                .map(|table_id| DatanodeTableKey {
519                    datanode_id: self.persistent_ctx.from_peer.id,
520                    table_id: *table_id,
521                })
522                .collect::<Vec<_>>();
523            let datanode_table_values = self
524                .table_metadata_manager
525                .datanode_table_manager()
526                .batch_get(&datanode_table_keys)
527                .await
528                .context(error::TableMetadataManagerSnafu)
529                .map_err(BoxedError::new)
530                .with_context(|_| error::RetryLaterWithSourceSnafu {
531                    reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
532                })?
533                .into_iter()
534                .map(|(k, v)| (k.table_id, v))
535                .collect();
536            *from_peer_datanode_table_values = Some(datanode_table_values);
537        }
538        Ok(from_peer_datanode_table_values.as_ref().unwrap())
539    }
540
541    /// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any.
542    /// Otherwise, returns the value retrieved from remote.
543    ///
544    /// Retry:
545    /// - Failed to retrieve the metadata of datanode table.
546    pub async fn get_from_peer_datanode_table_value(
547        &self,
548        table_id: TableId,
549    ) -> Result<DatanodeTableValue> {
550        let datanode_table_value = self
551            .table_metadata_manager
552            .datanode_table_manager()
553            .get(&DatanodeTableKey {
554                datanode_id: self.persistent_ctx.from_peer.id,
555                table_id,
556            })
557            .await
558            .context(error::TableMetadataManagerSnafu)
559            .map_err(BoxedError::new)
560            .with_context(|_| error::RetryLaterWithSourceSnafu {
561                reason: format!("Failed to get DatanodeTable: {table_id}"),
562            })?
563            .context(error::DatanodeTableNotFoundSnafu {
564                table_id,
565                datanode_id: self.persistent_ctx.from_peer.id,
566            })?;
567        Ok(datanode_table_value)
568    }
569
570    /// Notifies the RegionSupervisor to register failure detectors of failed region.
571    ///
572    /// The original failure detector was removed once the procedure was triggered.
573    /// Now, we need to register the failure detector for the failed region again.
574    pub async fn register_failure_detectors(&self) {
575        let datanode_id = self.persistent_ctx.from_peer.id;
576        let region_ids = &self.persistent_ctx.region_ids;
577        let detecting_regions = region_ids
578            .iter()
579            .map(|region_id| (datanode_id, *region_id))
580            .collect::<Vec<_>>();
581        self.region_failure_detector_controller
582            .register_failure_detectors(detecting_regions)
583            .await;
584        info!(
585            "Registered failure detectors after migration failures for datanode {}, regions {:?}",
586            datanode_id, region_ids
587        );
588    }
589
590    /// Notifies the RegionSupervisor to deregister failure detectors.
591    ///
592    /// The original failure detectors won't be removed once the procedure was triggered.
593    /// We need to deregister the failure detectors for the original region if the procedure is finished.
594    pub async fn deregister_failure_detectors(&self) {
595        let datanode_id = self.persistent_ctx.from_peer.id;
596        let region_ids = &self.persistent_ctx.region_ids;
597        let detecting_regions = region_ids
598            .iter()
599            .map(|region_id| (datanode_id, *region_id))
600            .collect::<Vec<_>>();
601
602        self.region_failure_detector_controller
603            .deregister_failure_detectors(detecting_regions)
604            .await;
605    }
606
607    /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
608    ///
609    /// The candidate region may be created on the destination peer,
610    /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
611    pub async fn deregister_failure_detectors_for_candidate_region(&self) {
612        let to_peer_id = self.persistent_ctx.to_peer.id;
613        let region_ids = &self.persistent_ctx.region_ids;
614        let detecting_regions = region_ids
615            .iter()
616            .map(|region_id| (to_peer_id, *region_id))
617            .collect::<Vec<_>>();
618
619        self.region_failure_detector_controller
620            .deregister_failure_detectors(detecting_regions)
621            .await;
622    }
623
624    /// Fetches the replay checkpoints for the given topic region keys.
625    pub async fn get_replay_checkpoints(
626        &self,
627        topic_region_keys: Vec<TopicRegionKey<'_>>,
628    ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
629        let topic_region_values = self
630            .table_metadata_manager
631            .topic_region_manager()
632            .batch_get(topic_region_keys)
633            .await
634            .context(error::TableMetadataManagerSnafu)?;
635
636        let replay_checkpoints = topic_region_values
637            .into_iter()
638            .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
639            .collect::<HashMap<_, _>>();
640
641        Ok(replay_checkpoints)
642    }
643
644    /// Broadcasts the invalidate table cache message.
645    pub async fn invalidate_table_cache(&self) -> Result<()> {
646        let table_ids = self.region_table_ids();
647        let mut cache_idents = Vec::with_capacity(table_ids.len());
648        for table_id in &table_ids {
649            cache_idents.push(CacheIdent::TableId(*table_id));
650        }
651        // ignore the result
652        let ctx = common_meta::cache_invalidator::Context::default();
653        let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
654        Ok(())
655    }
656
657    /// Returns the [PersistentContext] of the procedure.
658    pub fn persistent_ctx(&self) -> PersistentContext {
659        self.persistent_ctx.clone()
660    }
661}
662
663#[async_trait::async_trait]
664#[typetag::serde(tag = "region_migration_state")]
665pub(crate) trait State: Sync + Send + Debug {
666    fn name(&self) -> &'static str {
667        let type_name = std::any::type_name::<Self>();
668        // short name
669        type_name.split("::").last().unwrap_or(type_name)
670    }
671
672    /// Yields the next [State] and [Status].
673    async fn next(
674        &mut self,
675        ctx: &mut Context,
676        procedure_ctx: &ProcedureContext,
677    ) -> Result<(Box<dyn State>, Status)>;
678
679    /// Returns as [Any](std::any::Any).
680    fn as_any(&self) -> &dyn Any;
681}
682
683/// Persistent data of [RegionMigrationProcedure].
684#[derive(Debug, Serialize, Deserialize)]
685pub struct RegionMigrationDataOwned {
686    persistent_ctx: PersistentContext,
687    state: Box<dyn State>,
688}
689
690/// Persistent data of [RegionMigrationProcedure].
691#[derive(Debug, Serialize)]
692pub struct RegionMigrationData<'a> {
693    persistent_ctx: &'a PersistentContext,
694    state: &'a dyn State,
695}
696
697pub(crate) struct RegionMigrationProcedure {
698    state: Box<dyn State>,
699    context: Context,
700    _guards: Vec<RegionMigrationProcedureGuard>,
701}
702
703impl RegionMigrationProcedure {
704    const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
705
706    pub fn new(
707        persistent_context: PersistentContext,
708        context_factory: impl ContextFactory,
709        guards: Vec<RegionMigrationProcedureGuard>,
710    ) -> Self {
711        let state = Box::new(RegionMigrationStart {});
712        Self::new_inner(state, persistent_context, context_factory, guards)
713    }
714
715    fn new_inner(
716        state: Box<dyn State>,
717        persistent_context: PersistentContext,
718        context_factory: impl ContextFactory,
719        guards: Vec<RegionMigrationProcedureGuard>,
720    ) -> Self {
721        Self {
722            state,
723            context: context_factory.new_context(persistent_context),
724            _guards: guards,
725        }
726    }
727
728    fn from_json(
729        json: &str,
730        context_factory: impl ContextFactory,
731        tracker: RegionMigrationProcedureTracker,
732    ) -> ProcedureResult<Self> {
733        let RegionMigrationDataOwned {
734            persistent_ctx,
735            state,
736        } = serde_json::from_str(json).context(FromJsonSnafu)?;
737        let guards = persistent_ctx
738            .region_ids
739            .iter()
740            .flat_map(|region_id| {
741                tracker.insert_running_procedure(&RegionMigrationProcedureTask {
742                    region_id: *region_id,
743                    from_peer: persistent_ctx.from_peer.clone(),
744                    to_peer: persistent_ctx.to_peer.clone(),
745                    timeout: persistent_ctx.timeout,
746                    trigger_reason: persistent_ctx.trigger_reason,
747                })
748            })
749            .collect::<Vec<_>>();
750
751        let context = context_factory.new_context(persistent_ctx);
752
753        Ok(Self {
754            state,
755            context,
756            _guards: guards,
757        })
758    }
759
760    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
761        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
762            .with_label_values(&["rollback"])
763            .start_timer();
764        let ctx = &self.context;
765        let table_regions = ctx.persistent_ctx.table_regions();
766        for (table_id, regions) in table_regions {
767            let table_lock = TableLock::Write(table_id).into();
768            let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
769            let table_route = ctx.get_table_route_value(table_id).await?;
770            let region_routes = table_route.region_routes().unwrap();
771            let downgraded = region_routes
772                .iter()
773                .filter(|route| regions.contains(&route.region.id))
774                .any(|route| route.is_leader_downgrading());
775            if downgraded {
776                info!(
777                    "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
778                );
779                let table_metadata_manager = &ctx.table_metadata_manager;
780                table_metadata_manager
781                    .update_leader_region_status(table_id, &table_route, |route| {
782                        if regions.contains(&route.region.id) {
783                            Some(None)
784                        } else {
785                            None
786                        }
787                    })
788                    .await
789                    .context(error::TableMetadataManagerSnafu)
790                    .map_err(BoxedError::new)
791                    .with_context(|_| error::RetryLaterWithSourceSnafu {
792                        reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
793                    })?;
794            }
795        }
796        self.context
797            .deregister_failure_detectors_for_candidate_region()
798            .await;
799        self.context.register_failure_detectors().await;
800
801        Ok(())
802    }
803}
804
805#[async_trait::async_trait]
806impl Procedure for RegionMigrationProcedure {
807    fn type_name(&self) -> &str {
808        Self::TYPE_NAME
809    }
810
811    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
812        self.rollback_inner(ctx)
813            .await
814            .map_err(ProcedureError::external)
815    }
816
817    fn rollback_supported(&self) -> bool {
818        true
819    }
820
821    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
822        let state = &mut self.state;
823
824        let name = state.name();
825        let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
826            .with_label_values(&[name])
827            .start_timer();
828        match state.next(&mut self.context, ctx).await {
829            Ok((next, status)) => {
830                *state = next;
831                Ok(status)
832            }
833            Err(e) => {
834                if e.is_retryable() {
835                    METRIC_META_REGION_MIGRATION_ERROR
836                        .with_label_values(&[name, "retryable"])
837                        .inc();
838                    Err(ProcedureError::retry_later(e))
839                } else {
840                    // Consumes the opening region guard before deregistering the failure detectors.
841                    self.context.volatile_ctx.opening_region_guards.clear();
842                    self.context
843                        .deregister_failure_detectors_for_candidate_region()
844                        .await;
845                    error!(
846                        e;
847                        "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
848                        self.context.persistent_ctx.region_ids,
849                        self.context.persistent_ctx.from_peer,
850                        self.context.persistent_ctx.to_peer,
851                        self.context.volatile_ctx.metrics,
852                    );
853                    METRIC_META_REGION_MIGRATION_ERROR
854                        .with_label_values(&[name, "external"])
855                        .inc();
856                    Err(ProcedureError::external(e))
857                }
858            }
859        }
860    }
861
862    fn dump(&self) -> ProcedureResult<String> {
863        let data = RegionMigrationData {
864            state: self.state.as_ref(),
865            persistent_ctx: &self.context.persistent_ctx,
866        };
867        serde_json::to_string(&data).context(ToJsonSnafu)
868    }
869
870    fn lock_key(&self) -> LockKey {
871        LockKey::new(self.context.persistent_ctx.lock_key())
872    }
873
874    fn user_metadata(&self) -> Option<UserMetadata> {
875        Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
876    }
877}
878
879#[cfg(test)]
880mod tests {
881    use std::assert_matches::assert_matches;
882    use std::sync::Arc;
883
884    use common_meta::distributed_time_constants::REGION_LEASE_SECS;
885    use common_meta::instruction::Instruction;
886    use common_meta::key::test_utils::new_test_table_info;
887    use common_meta::rpc::router::{Region, RegionRoute};
888
889    use super::*;
890    use crate::handler::HeartbeatMailbox;
891    use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
892    use crate::procedure::region_migration::test_util::*;
893    use crate::procedure::test_util::{
894        new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
895        new_upgrade_region_reply,
896    };
897    use crate::service::mailbox::Channel;
898
899    fn new_persistent_context() -> PersistentContext {
900        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
901    }
902
903    #[test]
904    fn test_lock_key() {
905        let persistent_context = new_persistent_context();
906        let expected_keys = persistent_context.lock_key();
907
908        let env = TestingEnv::new();
909        let context = env.context_factory();
910
911        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
912
913        let key = procedure.lock_key();
914        let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
915
916        for key in expected_keys {
917            assert!(keys.contains(&key));
918        }
919    }
920
921    #[test]
922    fn test_data_serialization() {
923        let persistent_context = new_persistent_context();
924
925        let env = TestingEnv::new();
926        let context = env.context_factory();
927
928        let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
929
930        let serialized = procedure.dump().unwrap();
931        let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
932        assert_eq!(expected, serialized);
933    }
934
935    #[test]
936    fn test_backward_compatibility() {
937        let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
938        // NOTES: Changes it will break backward compatibility.
939        let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
940        let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
941
942        assert_eq!(persistent_ctx, deserialized);
943    }
944
945    #[derive(Debug, Serialize, Deserialize, Default)]
946    pub struct MockState;
947
948    #[async_trait::async_trait]
949    #[typetag::serde]
950    impl State for MockState {
951        async fn next(
952            &mut self,
953            _ctx: &mut Context,
954            _procedure_ctx: &ProcedureContext,
955        ) -> Result<(Box<dyn State>, Status)> {
956            Ok((Box::new(MockState), Status::done()))
957        }
958
959        fn as_any(&self) -> &dyn Any {
960            self
961        }
962    }
963
964    #[tokio::test]
965    async fn test_execution_after_deserialized() {
966        let env = TestingEnv::new();
967
968        fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
969            let persistent_context = new_persistent_context();
970            let context_factory = env.context_factory();
971            let state = Box::<MockState>::default();
972            RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
973        }
974
975        let ctx = TestingEnv::procedure_context();
976        let mut procedure = new_mock_procedure(&env);
977        let mut status = None;
978        for _ in 0..3 {
979            status = Some(procedure.execute(&ctx).await.unwrap());
980        }
981        assert!(status.unwrap().is_done());
982
983        let ctx = TestingEnv::procedure_context();
984        let mut procedure = new_mock_procedure(&env);
985
986        status = Some(procedure.execute(&ctx).await.unwrap());
987
988        let serialized = procedure.dump().unwrap();
989
990        let context_factory = env.context_factory();
991        let tracker = env.tracker();
992        let mut procedure =
993            RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
994                .unwrap();
995        for region_id in &procedure.context.persistent_ctx.region_ids {
996            assert!(tracker.contains(*region_id));
997        }
998
999        for _ in 1..3 {
1000            status = Some(procedure.execute(&ctx).await.unwrap());
1001        }
1002        assert!(status.unwrap().is_done());
1003    }
1004
1005    #[tokio::test]
1006    async fn test_broadcast_invalidate_table_cache() {
1007        let mut env = TestingEnv::new();
1008        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1009        let ctx = env.context_factory().new_context(persistent_context);
1010        let mailbox_ctx = env.mailbox_context();
1011
1012        // No receivers.
1013        ctx.invalidate_table_cache().await.unwrap();
1014
1015        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1016
1017        mailbox_ctx
1018            .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1019            .await;
1020
1021        ctx.invalidate_table_cache().await.unwrap();
1022
1023        let resp = rx.recv().await.unwrap().unwrap();
1024        let msg = resp.mailbox_message.unwrap();
1025
1026        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1027        assert_eq!(
1028            instruction,
1029            Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1030        );
1031    }
1032
1033    fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1034        vec![
1035            // MigrationStart
1036            Step::next(
1037                "Should be the open candidate region",
1038                None,
1039                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1040            ),
1041            // OpenCandidateRegion
1042            Step::next(
1043                "Should be the flush leader region",
1044                Some(mock_datanode_reply(
1045                    to_peer_id,
1046                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1047                )),
1048                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1049            ),
1050            // Flush Leader Region
1051            Step::next(
1052                "Should be the flush leader region",
1053                Some(mock_datanode_reply(
1054                    from_peer_id,
1055                    Arc::new(move |id| {
1056                        Ok(new_flush_region_reply_for_region(
1057                            id,
1058                            RegionId::new(1024, 1),
1059                            true,
1060                            None,
1061                        ))
1062                    }),
1063                )),
1064                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1065            ),
1066            // UpdateMetadata::Downgrade
1067            Step::next(
1068                "Should be the downgrade leader region",
1069                None,
1070                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1071            ),
1072            // Downgrade Candidate
1073            Step::next(
1074                "Should be the upgrade candidate region",
1075                Some(mock_datanode_reply(
1076                    from_peer_id,
1077                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1078                )),
1079                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1080            ),
1081            // Upgrade Candidate
1082            Step::next(
1083                "Should be the update metadata for upgrading",
1084                Some(mock_datanode_reply(
1085                    to_peer_id,
1086                    Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1087                )),
1088                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1089            ),
1090            // UpdateMetadata::Upgrade
1091            Step::next(
1092                "Should be the close downgraded region",
1093                None,
1094                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1095            ),
1096            // CloseDowngradedRegion
1097            Step::next(
1098                "Should be the region migration end",
1099                None,
1100                Assertion::simple(assert_region_migration_end, assert_done),
1101            ),
1102            // RegionMigrationEnd
1103            Step::next(
1104                "Should be the region migration end again",
1105                None,
1106                Assertion::simple(assert_region_migration_end, assert_done),
1107            ),
1108        ]
1109    }
1110
1111    #[tokio::test]
1112    async fn test_procedure_flow() {
1113        common_telemetry::init_default_ut_logging();
1114
1115        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1116        let state = Box::new(RegionMigrationStart);
1117
1118        // The table metadata.
1119        let from_peer_id = persistent_context.from_peer.id;
1120        let to_peer_id = persistent_context.to_peer.id;
1121        let from_peer = persistent_context.from_peer.clone();
1122        let to_peer = persistent_context.to_peer.clone();
1123        let region_id = persistent_context.region_ids[0];
1124        let table_info = new_test_table_info(1024, vec![1]).into();
1125        let region_routes = vec![RegionRoute {
1126            region: Region::new_test(region_id),
1127            leader_peer: Some(from_peer),
1128            follower_peers: vec![to_peer],
1129            ..Default::default()
1130        }];
1131
1132        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1133        suite.init_table_metadata(table_info, region_routes).await;
1134
1135        let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1136        let timer = Instant::now();
1137
1138        // Run the table tests.
1139        let runner = ProcedureMigrationSuiteRunner::new(suite)
1140            .steps(steps)
1141            .run_once()
1142            .await;
1143
1144        // Ensure it didn't run into the slow path.
1145        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1146
1147        runner.suite.verify_table_metadata().await;
1148    }
1149
1150    #[tokio::test]
1151    async fn test_procedure_flow_open_candidate_region_retryable_error() {
1152        common_telemetry::init_default_ut_logging();
1153
1154        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1155        let state = Box::new(RegionMigrationStart);
1156
1157        // The table metadata.
1158        let to_peer_id = persistent_context.to_peer.id;
1159        let from_peer = persistent_context.from_peer.clone();
1160        let region_id = persistent_context.region_ids[0];
1161        let table_info = new_test_table_info(1024, vec![1]).into();
1162        let region_routes = vec![RegionRoute {
1163            region: Region::new_test(region_id),
1164            leader_peer: Some(from_peer),
1165            follower_peers: vec![],
1166            ..Default::default()
1167        }];
1168
1169        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1170        suite.init_table_metadata(table_info, region_routes).await;
1171
1172        let steps = vec![
1173            // Migration Start
1174            Step::next(
1175                "Should be the open candidate region",
1176                None,
1177                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1178            ),
1179            // OpenCandidateRegion
1180            Step::next(
1181                "Should be throwing a non-retry error",
1182                Some(mock_datanode_reply(
1183                    to_peer_id,
1184                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1185                )),
1186                Assertion::error(|error| assert!(error.is_retryable())),
1187            ),
1188            // OpenCandidateRegion
1189            Step::next(
1190                "Should be throwing a non-retry error again",
1191                Some(mock_datanode_reply(
1192                    to_peer_id,
1193                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1194                )),
1195                Assertion::error(|error| assert!(error.is_retryable())),
1196            ),
1197        ];
1198
1199        let setup_to_latest_persisted_state = Step::setup(
1200            "Sets state to UpdateMetadata::Downgrade",
1201            merge_before_test_fn(vec![
1202                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1203                Arc::new(reset_volatile_ctx),
1204            ]),
1205        );
1206
1207        let steps = [
1208            steps.clone(),
1209            // Mocks the volatile ctx lost(i.g., Meta leader restarts).
1210            vec![setup_to_latest_persisted_state.clone()],
1211            steps.clone()[1..].to_vec(),
1212            vec![setup_to_latest_persisted_state],
1213            steps.clone()[1..].to_vec(),
1214        ]
1215        .concat();
1216
1217        // Run the table tests.
1218        let runner = ProcedureMigrationSuiteRunner::new(suite)
1219            .steps(steps.clone())
1220            .run_once()
1221            .await;
1222
1223        let table_routes_version = runner
1224            .env()
1225            .table_metadata_manager()
1226            .table_route_manager()
1227            .table_route_storage()
1228            .get(region_id.table_id())
1229            .await
1230            .unwrap()
1231            .unwrap()
1232            .version();
1233        // Should be unchanged.
1234        assert_eq!(table_routes_version.unwrap(), 0);
1235    }
1236
1237    #[tokio::test]
1238    async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1239        common_telemetry::init_default_ut_logging();
1240
1241        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1242        let state = Box::new(RegionMigrationStart);
1243
1244        // The table metadata.
1245        let from_peer_id = persistent_context.from_peer.id;
1246        let to_peer_id = persistent_context.to_peer.id;
1247        let from_peer = persistent_context.from_peer.clone();
1248        let region_id = persistent_context.region_ids[0];
1249        let table_info = new_test_table_info(1024, vec![1]).into();
1250        let region_routes = vec![RegionRoute {
1251            region: Region::new_test(region_id),
1252            leader_peer: Some(from_peer),
1253            follower_peers: vec![],
1254            ..Default::default()
1255        }];
1256
1257        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1258        suite.init_table_metadata(table_info, region_routes).await;
1259
1260        let steps = vec![
1261            // MigrationStart
1262            Step::next(
1263                "Should be the open candidate region",
1264                None,
1265                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1266            ),
1267            // OpenCandidateRegion
1268            Step::next(
1269                "Should be the flush leader region",
1270                Some(mock_datanode_reply(
1271                    to_peer_id,
1272                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1273                )),
1274                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1275            ),
1276            // Flush Leader Region
1277            Step::next(
1278                "Should be the flush leader region",
1279                Some(mock_datanode_reply(
1280                    from_peer_id,
1281                    Arc::new(move |id| {
1282                        Ok(new_flush_region_reply_for_region(
1283                            id,
1284                            RegionId::new(1024, 1),
1285                            true,
1286                            None,
1287                        ))
1288                    }),
1289                )),
1290                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1291            ),
1292            // UpdateMetadata::Downgrade
1293            Step::next(
1294                "Should be the downgrade leader region",
1295                None,
1296                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1297            ),
1298            // Downgrade Candidate
1299            Step::next(
1300                "Should be the upgrade candidate region",
1301                Some(mock_datanode_reply(
1302                    from_peer_id,
1303                    Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1304                )),
1305                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1306            ),
1307            // Upgrade Candidate
1308            Step::next(
1309                "Should be the rollback metadata",
1310                Some(mock_datanode_reply(
1311                    to_peer_id,
1312                    Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1313                )),
1314                Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1315            ),
1316            // UpdateMetadata::Rollback
1317            Step::next(
1318                "Should be the region migration abort",
1319                None,
1320                Assertion::simple(assert_region_migration_abort, assert_no_persist),
1321            ),
1322            // RegionMigrationAbort
1323            Step::next(
1324                "Should throw an error",
1325                None,
1326                Assertion::error(|error| {
1327                    assert!(!error.is_retryable());
1328                    assert_matches!(error, error::Error::MigrationAbort { .. });
1329                }),
1330            ),
1331        ];
1332
1333        let setup_to_latest_persisted_state = Step::setup(
1334            "Sets state to OpenCandidateRegion",
1335            merge_before_test_fn(vec![
1336                setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1337                Arc::new(reset_volatile_ctx),
1338            ]),
1339        );
1340
1341        let steps = [
1342            steps.clone(),
1343            vec![setup_to_latest_persisted_state.clone()],
1344            steps.clone()[1..].to_vec(),
1345            vec![setup_to_latest_persisted_state],
1346            steps.clone()[1..].to_vec(),
1347        ]
1348        .concat();
1349
1350        // Run the table tests.
1351        ProcedureMigrationSuiteRunner::new(suite)
1352            .steps(steps.clone())
1353            .run_once()
1354            .await;
1355    }
1356
1357    #[tokio::test]
1358    async fn test_procedure_flow_upgrade_candidate_with_retry() {
1359        common_telemetry::init_default_ut_logging();
1360
1361        let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1362        let state = Box::new(RegionMigrationStart);
1363
1364        // The table metadata.
1365        let to_peer_id = persistent_context.to_peer.id;
1366        let from_peer_id = persistent_context.from_peer.id;
1367        let from_peer = persistent_context.from_peer.clone();
1368        let region_id = persistent_context.region_ids[0];
1369        let table_info = new_test_table_info(1024, vec![1]).into();
1370        let region_routes = vec![RegionRoute {
1371            region: Region::new_test(region_id),
1372            leader_peer: Some(from_peer),
1373            follower_peers: vec![],
1374            ..Default::default()
1375        }];
1376
1377        let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1378        suite.init_table_metadata(table_info, region_routes).await;
1379
1380        let steps = vec![
1381            // Migration Start
1382            Step::next(
1383                "Should be the open candidate region",
1384                None,
1385                Assertion::simple(assert_open_candidate_region, assert_need_persist),
1386            ),
1387            // OpenCandidateRegion
1388            Step::next(
1389                "Should be throwing a retryable error",
1390                Some(mock_datanode_reply(
1391                    to_peer_id,
1392                    Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1393                )),
1394                Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1395            ),
1396            // OpenCandidateRegion
1397            Step::next(
1398                "Should be the update metadata for downgrading",
1399                Some(mock_datanode_reply(
1400                    to_peer_id,
1401                    Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1402                )),
1403                Assertion::simple(assert_flush_leader_region, assert_no_persist),
1404            ),
1405            // Flush Leader Region
1406            Step::next(
1407                "Should be the flush leader region",
1408                Some(mock_datanode_reply(
1409                    from_peer_id,
1410                    Arc::new(move |id| {
1411                        Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1412                    }),
1413                )),
1414                Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1415            ),
1416            // UpdateMetadata::Downgrade
1417            Step::next(
1418                "Should be the downgrade leader region",
1419                None,
1420                Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1421            ),
1422            // Downgrade Leader
1423            Step::next(
1424                "Should be the upgrade candidate region",
1425                Some(mock_datanode_reply(
1426                    from_peer_id,
1427                    merge_mailbox_messages(vec![
1428                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1429                        Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1430                    ]),
1431                )),
1432                Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1433            ),
1434            // Upgrade Candidate
1435            Step::next(
1436                "Should be the update metadata for upgrading",
1437                Some(mock_datanode_reply(
1438                    to_peer_id,
1439                    merge_mailbox_messages(vec![
1440                        Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1441                        Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1442                    ]),
1443                )),
1444                Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1445            ),
1446            // UpdateMetadata::Upgrade
1447            Step::next(
1448                "Should be the close downgraded region",
1449                None,
1450                Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1451            ),
1452            // CloseDowngradedRegion
1453            Step::next(
1454                "Should be the region migration end",
1455                None,
1456                Assertion::simple(assert_region_migration_end, assert_done),
1457            ),
1458            // RegionMigrationEnd
1459            Step::next(
1460                "Should be the region migration end again",
1461                None,
1462                Assertion::simple(assert_region_migration_end, assert_done),
1463            ),
1464            // RegionMigrationStart
1465            Step::setup(
1466                "Sets state to RegionMigrationStart",
1467                merge_before_test_fn(vec![
1468                    setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1469                    Arc::new(reset_volatile_ctx),
1470                ]),
1471            ),
1472            // RegionMigrationEnd
1473            // Note: We can't run this test multiple times;
1474            // the `peer_id`'s `DatanodeTable` will be removed after first-time migration success.
1475            Step::next(
1476                "Should be the region migration end(has been migrated)",
1477                None,
1478                Assertion::simple(assert_region_migration_end, assert_done),
1479            ),
1480        ];
1481
1482        let steps = [steps.clone()].concat();
1483        let timer = Instant::now();
1484
1485        // Run the table tests.
1486        let runner = ProcedureMigrationSuiteRunner::new(suite)
1487            .steps(steps.clone())
1488            .run_once()
1489            .await;
1490
1491        // Ensure it didn't run into the slow path.
1492        assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1493        runner.suite.verify_table_metadata().await;
1494    }
1495}