meta_srv/procedure/region_migration/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::utils::{
35    RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
36};
37use crate::procedure::region_migration::{
38    DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
39};
40
41pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
42
43/// Manager of region migration procedure.
44pub struct RegionMigrationManager {
45    procedure_manager: ProcedureManagerRef,
46    context_factory: DefaultContextFactory,
47    tracker: RegionMigrationProcedureTracker,
48}
49
50#[derive(Default, Clone)]
51pub struct RegionMigrationProcedureTracker {
52    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
53}
54
55impl RegionMigrationProcedureTracker {
56    /// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating.
57    pub(crate) fn insert_running_procedure(
58        &self,
59        task: &RegionMigrationProcedureTask,
60    ) -> Option<RegionMigrationProcedureGuard> {
61        let mut procedures = self.running_procedures.write().unwrap();
62        match procedures.entry(task.region_id) {
63            Entry::Occupied(_) => None,
64            Entry::Vacant(v) => {
65                v.insert(task.clone());
66                Some(RegionMigrationProcedureGuard {
67                    region_id: task.region_id,
68                    running_procedures: self.running_procedures.clone(),
69                })
70            }
71        }
72    }
73
74    /// Returns true if it contains the specific region(`region_id`).
75    pub(crate) fn contains(&self, region_id: RegionId) -> bool {
76        self.running_procedures
77            .read()
78            .unwrap()
79            .contains_key(&region_id)
80    }
81}
82
83/// The guard of running [RegionMigrationProcedureTask].
84pub(crate) struct RegionMigrationProcedureGuard {
85    region_id: RegionId,
86    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
87}
88
89impl Drop for RegionMigrationProcedureGuard {
90    fn drop(&mut self) {
91        let exists = self
92            .running_procedures
93            .read()
94            .unwrap()
95            .contains_key(&self.region_id);
96        if exists {
97            self.running_procedures
98                .write()
99                .unwrap()
100                .remove(&self.region_id);
101        }
102    }
103}
104
105/// A task of region migration procedure.
106#[derive(Debug, Clone)]
107pub struct RegionMigrationProcedureTask {
108    pub(crate) region_id: RegionId,
109    pub(crate) from_peer: Peer,
110    pub(crate) to_peer: Peer,
111    pub(crate) timeout: Duration,
112    pub(crate) trigger_reason: RegionMigrationTriggerReason,
113}
114
115/// The reason why the region migration procedure is triggered.
116#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
117#[strum(serialize_all = "PascalCase")]
118pub enum RegionMigrationTriggerReason {
119    #[default]
120    /// The region migration procedure is triggered by unknown reason.
121    Unknown,
122    /// The region migration procedure is triggered by administrator.
123    Manual,
124    /// The region migration procedure is triggered by auto rebalance.
125    AutoRebalance,
126    /// The region migration procedure is triggered by failover.
127    Failover,
128}
129
130impl RegionMigrationProcedureTask {
131    pub fn new(
132        region_id: RegionId,
133        from_peer: Peer,
134        to_peer: Peer,
135        timeout: Duration,
136        trigger_reason: RegionMigrationTriggerReason,
137    ) -> Self {
138        Self {
139            region_id,
140            from_peer,
141            to_peer,
142            timeout,
143            trigger_reason,
144        }
145    }
146}
147
148impl Display for RegionMigrationProcedureTask {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        write!(
151            f,
152            "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
153            self.region_id, self.from_peer, self.to_peer, self.trigger_reason
154        )
155    }
156}
157
158/// The result of submitting a region migration task.
159#[derive(Debug, Default, PartialEq, Eq)]
160pub struct SubmitRegionMigrationTaskResult {
161    /// Regions already migrated to the `to_peer`.
162    pub migrated: Vec<RegionId>,
163    /// Regions where the leader peer has changed.
164    pub leader_changed: Vec<RegionId>,
165    /// Regions where `to_peer` is already a follower (conflict).
166    pub peer_conflict: Vec<RegionId>,
167    /// Regions whose table is not found.
168    pub table_not_found: Vec<RegionId>,
169    /// Regions still pending migration.
170    pub migrating: Vec<RegionId>,
171    /// Regions that have been submitted for migration.
172    pub submitted: Vec<RegionId>,
173    /// The procedure id of the region migration procedure.
174    pub procedure_id: Option<ProcedureId>,
175}
176
177impl RegionMigrationManager {
178    /// Returns new [`RegionMigrationManager`]
179    pub(crate) fn new(
180        procedure_manager: ProcedureManagerRef,
181        context_factory: DefaultContextFactory,
182    ) -> Self {
183        Self {
184            procedure_manager,
185            context_factory,
186            tracker: RegionMigrationProcedureTracker::default(),
187        }
188    }
189
190    /// Returns the [`RegionMigrationProcedureTracker`].
191    pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
192        &self.tracker
193    }
194
195    /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
196    pub(crate) fn try_start(&self) -> Result<()> {
197        let context_factory = self.context_factory.clone();
198        let tracker = self.tracker.clone();
199        self.procedure_manager
200            .register_loader(
201                RegionMigrationProcedure::TYPE_NAME,
202                Box::new(move |json| {
203                    let context_factory = context_factory.clone();
204                    let tracker = tracker.clone();
205                    RegionMigrationProcedure::from_json(json, context_factory, tracker)
206                        .map(|p| Box::new(p) as _)
207                }),
208            )
209            .context(error::RegisterProcedureLoaderSnafu {
210                type_name: RegionMigrationProcedure::TYPE_NAME,
211            })
212    }
213
214    fn insert_running_procedure(
215        &self,
216        task: &RegionMigrationProcedureTask,
217    ) -> Option<RegionMigrationProcedureGuard> {
218        self.tracker.insert_running_procedure(task)
219    }
220
221    fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
222        if task.to_peer.id == task.from_peer.id {
223            return error::InvalidArgumentsSnafu {
224                err_msg: "The `from_peer_id` can't equal `to_peer_id`",
225            }
226            .fail();
227        }
228
229        Ok(())
230    }
231
232    async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
233        let table_route = self
234            .context_factory
235            .table_metadata_manager
236            .table_route_manager()
237            .table_route_storage()
238            .get(region_id.table_id())
239            .await
240            .context(error::TableMetadataManagerSnafu)?
241            .context(error::TableRouteNotFoundSnafu {
242                table_id: region_id.table_id(),
243            })?;
244
245        Ok(table_route)
246    }
247
248    async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
249        let table_route = self
250            .context_factory
251            .table_metadata_manager
252            .table_info_manager()
253            .get(region_id.table_id())
254            .await
255            .context(error::TableMetadataManagerSnafu)?
256            .context(error::TableInfoNotFoundSnafu {
257                table_id: region_id.table_id(),
258            })?
259            .into_inner();
260
261        Ok(table_route)
262    }
263
264    /// Verifies the type of region migration table route.
265    fn verify_table_route(
266        &self,
267        table_route: &TableRouteValue,
268        task: &RegionMigrationProcedureTask,
269    ) -> Result<()> {
270        if !table_route.is_physical() {
271            return error::UnexpectedSnafu {
272                violated: format!(
273                    "Trying to execute region migration on the logical table, task {task}"
274                ),
275            }
276            .fail();
277        }
278
279        Ok(())
280    }
281
282    /// Returns true if the region has been migrated.
283    fn has_migrated(
284        &self,
285        region_route: &RegionRoute,
286        task: &RegionMigrationProcedureTask,
287    ) -> Result<bool> {
288        if region_route.is_leader_downgrading() {
289            return Ok(false);
290        }
291
292        let leader_peer = region_route
293            .leader_peer
294            .as_ref()
295            .context(error::UnexpectedSnafu {
296                violated: "Region route leader peer is not found",
297            })?;
298
299        Ok(leader_peer.id == task.to_peer.id)
300    }
301
302    /// Throws an error if `leader_peer` is not the `from_peer`.
303    ///
304    /// If `from_peer` is unknown, use the leader peer as the `from_peer`.
305    fn verify_region_leader_peer(
306        &self,
307        region_route: &RegionRoute,
308        task: &mut RegionMigrationProcedureTask,
309    ) -> Result<()> {
310        let leader_peer = region_route
311            .leader_peer
312            .as_ref()
313            .context(error::UnexpectedSnafu {
314                violated: "Region route leader peer is not found",
315            })?;
316
317        ensure!(
318            leader_peer.id == task.from_peer.id,
319            error::LeaderPeerChangedSnafu {
320                msg: format!(
321                    "Region's leader peer({}) is not the `from_peer`({}), region: {}",
322                    leader_peer.id, task.from_peer.id, task.region_id
323                ),
324            }
325        );
326
327        if task.from_peer.addr.is_empty() {
328            warn!(
329                "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
330                leader_peer, task.region_id
331            );
332            // The peer id is the same as the leader peer id.
333            task.from_peer = leader_peer.clone();
334        }
335
336        Ok(())
337    }
338
339    /// Throws an error if `to_peer` is already has a region follower.
340    fn verify_region_follower_peers(
341        &self,
342        region_route: &RegionRoute,
343        task: &RegionMigrationProcedureTask,
344    ) -> Result<()> {
345        ensure!(
346            !region_route.follower_peers.contains(&task.to_peer),
347            error::InvalidArgumentsSnafu {
348                err_msg: format!(
349                    "The `to_peer`({}) is already has a region follower, region: {}",
350                    task.to_peer.id, task.region_id
351                ),
352            },
353        );
354
355        Ok(())
356    }
357
358    /// Extracts regions from the migration task that are already running migration procedures.
359    ///
360    /// Returns a tuple containing those region ids that are already running and the newly created procedure guards.
361    /// The regions that are already running will be removed from the [`RegionMigrationTask`].
362    fn extract_running_regions(
363        &self,
364        task: &mut RegionMigrationTaskBatch,
365    ) -> (Vec<RegionId>, Vec<RegionMigrationProcedureGuard>) {
366        let mut migrating_region_ids = Vec::new();
367        let mut procedure_guards = Vec::with_capacity(task.region_ids.len());
368
369        for region_id in &task.region_ids {
370            let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new(
371                *region_id,
372                task.from_peer.clone(),
373                task.to_peer.clone(),
374                task.timeout,
375                task.trigger_reason,
376            )) else {
377                migrating_region_ids.push(*region_id);
378                continue;
379            };
380            procedure_guards.push(guard);
381        }
382
383        let migrating_set = migrating_region_ids.iter().cloned().collect::<HashSet<_>>();
384        task.region_ids.retain(|id| !migrating_set.contains(id));
385
386        (migrating_region_ids, procedure_guards)
387    }
388
389    pub async fn submit_region_migration_task(
390        &self,
391        mut task: RegionMigrationTaskBatch,
392    ) -> Result<SubmitRegionMigrationTaskResult> {
393        let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task);
394        let RegionMigrationAnalysis {
395            migrated,
396            leader_changed,
397            peer_conflict,
398            mut table_not_found,
399            pending,
400        } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager)
401            .await?;
402        if pending.is_empty() {
403            return Ok(SubmitRegionMigrationTaskResult {
404                migrated,
405                leader_changed,
406                peer_conflict,
407                table_not_found,
408                migrating: migrating_region_ids,
409                submitted: vec![],
410                procedure_id: None,
411            });
412        }
413
414        // Updates the region ids to the pending region ids.
415        task.region_ids = pending;
416        let table_regions = task.table_regions();
417        let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
418        let table_info_values = self
419            .context_factory
420            .table_metadata_manager
421            .table_info_manager()
422            .batch_get(&table_ids)
423            .await
424            .context(error::TableMetadataManagerSnafu)?;
425        let mut catalog_and_schema = Vec::with_capacity(table_info_values.len());
426        for (table_id, regions) in table_regions {
427            match table_info_values.get(&table_id) {
428                Some(table_info) => {
429                    let TableName {
430                        catalog_name,
431                        schema_name,
432                        ..
433                    } = table_info.table_name();
434                    catalog_and_schema.push((catalog_name, schema_name));
435                }
436                None => {
437                    task.region_ids.retain(|id| id.table_id() != table_id);
438                    table_not_found.extend(regions);
439                }
440            }
441        }
442        if task.region_ids.is_empty() {
443            return Ok(SubmitRegionMigrationTaskResult {
444                migrated,
445                leader_changed,
446                peer_conflict,
447                table_not_found,
448                migrating: migrating_region_ids,
449                submitted: vec![],
450                procedure_id: None,
451            });
452        }
453
454        let submitting_region_ids = task.region_ids.clone();
455        let procedure_id = self
456            .submit_procedure_inner(task, procedure_guards, catalog_and_schema)
457            .await?;
458        Ok(SubmitRegionMigrationTaskResult {
459            migrated,
460            leader_changed,
461            peer_conflict,
462            table_not_found,
463            migrating: migrating_region_ids,
464            submitted: submitting_region_ids,
465            procedure_id: Some(procedure_id),
466        })
467    }
468
469    async fn submit_procedure_inner(
470        &self,
471        task: RegionMigrationTaskBatch,
472        procedure_guards: Vec<RegionMigrationProcedureGuard>,
473        catalog_and_schema: Vec<(String, String)>,
474    ) -> Result<ProcedureId> {
475        let procedure = RegionMigrationProcedure::new(
476            PersistentContext::new(
477                catalog_and_schema,
478                task.from_peer.clone(),
479                task.to_peer.clone(),
480                task.region_ids.clone(),
481                task.timeout,
482                task.trigger_reason,
483            ),
484            self.context_factory.clone(),
485            procedure_guards,
486        );
487        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488        let procedure_id = procedure_with_id.id;
489        info!("Starting region migration procedure {procedure_id} for {task}");
490        let procedure_manager = self.procedure_manager.clone();
491        let num_region = task.region_ids.len();
492
493        common_runtime::spawn_global(async move {
494            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
495                Ok(watcher) => watcher,
496                Err(e) => {
497                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
498                    return;
499                }
500            };
501            METRIC_META_REGION_MIGRATION_DATANODES
502                .with_label_values(&["src", &task.from_peer.id.to_string()])
503                .inc_by(num_region as u64);
504            METRIC_META_REGION_MIGRATION_DATANODES
505                .with_label_values(&["desc", &task.to_peer.id.to_string()])
506                .inc_by(num_region as u64);
507
508            if let Err(e) = watcher::wait(watcher).await {
509                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
510                METRIC_META_REGION_MIGRATION_FAIL.inc();
511                return;
512            }
513
514            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
515        });
516
517        Ok(procedure_id)
518    }
519
520    /// Submits a new region migration procedure.
521    pub async fn submit_procedure(
522        &self,
523        mut task: RegionMigrationProcedureTask,
524    ) -> Result<Option<ProcedureId>> {
525        let Some(guard) = self.insert_running_procedure(&task) else {
526            return error::MigrationRunningSnafu {
527                region_id: task.region_id,
528            }
529            .fail();
530        };
531
532        self.verify_task(&task)?;
533
534        let region_id = task.region_id;
535
536        let table_route = self.retrieve_table_route(region_id).await?;
537        self.verify_table_route(&table_route, &task)?;
538
539        // Safety: checked before.
540        let region_route = table_route
541            .region_route(region_id)
542            .context(error::UnexpectedLogicalRouteTableSnafu {
543                err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
544            })?
545            .context(error::RegionRouteNotFoundSnafu { region_id })?;
546
547        if self.has_migrated(&region_route, &task)? {
548            info!("Skipping region migration task: {task}");
549            return error::RegionMigratedSnafu {
550                region_id,
551                target_peer_id: task.to_peer.id,
552            }
553            .fail();
554        }
555
556        self.verify_region_leader_peer(&region_route, &mut task)?;
557        self.verify_region_follower_peers(&region_route, &task)?;
558        let table_info = self.retrieve_table_info(region_id).await?;
559        let TableName {
560            catalog_name,
561            schema_name,
562            ..
563        } = table_info.table_name();
564        let RegionMigrationProcedureTask {
565            region_id,
566            from_peer,
567            to_peer,
568            timeout,
569            trigger_reason,
570        } = task.clone();
571        let procedure = RegionMigrationProcedure::new(
572            PersistentContext::new(
573                vec![(catalog_name, schema_name)],
574                from_peer,
575                to_peer,
576                vec![region_id],
577                timeout,
578                trigger_reason,
579            ),
580            self.context_factory.clone(),
581            vec![guard],
582        );
583        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
584        let procedure_id = procedure_with_id.id;
585        info!("Starting region migration procedure {procedure_id} for {task}");
586        let procedure_manager = self.procedure_manager.clone();
587        common_runtime::spawn_global(async move {
588            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
589                Ok(watcher) => watcher,
590                Err(e) => {
591                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
592                    return;
593                }
594            };
595            METRIC_META_REGION_MIGRATION_DATANODES
596                .with_label_values(&["src", &task.from_peer.id.to_string()])
597                .inc();
598            METRIC_META_REGION_MIGRATION_DATANODES
599                .with_label_values(&["desc", &task.to_peer.id.to_string()])
600                .inc();
601
602            if let Err(e) = watcher::wait(watcher).await {
603                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
604                METRIC_META_REGION_MIGRATION_FAIL.inc();
605                return;
606            }
607
608            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
609        });
610
611        Ok(Some(procedure_id))
612    }
613}
614
615#[cfg(test)]
616mod test {
617    use std::assert_matches::assert_matches;
618
619    use common_meta::key::table_route::LogicalTableRouteValue;
620    use common_meta::key::test_utils::new_test_table_info;
621    use common_meta::rpc::router::Region;
622
623    use super::*;
624    use crate::procedure::region_migration::test_util::TestingEnv;
625
626    #[tokio::test]
627    async fn test_insert_running_procedure() {
628        let env = TestingEnv::new();
629        let context_factory = env.context_factory();
630        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
631        let region_id = RegionId::new(1024, 1);
632        let task = RegionMigrationProcedureTask {
633            region_id,
634            from_peer: Peer::empty(2),
635            to_peer: Peer::empty(1),
636            timeout: Duration::from_millis(1000),
637            trigger_reason: RegionMigrationTriggerReason::Manual,
638        };
639        // Inserts one
640        manager
641            .tracker
642            .running_procedures
643            .write()
644            .unwrap()
645            .insert(region_id, task.clone());
646
647        let err = manager.submit_procedure(task).await.unwrap_err();
648        assert_matches!(err, error::Error::MigrationRunning { .. });
649    }
650
651    #[tokio::test]
652    async fn test_submit_procedure_invalid_task() {
653        let env = TestingEnv::new();
654        let context_factory = env.context_factory();
655        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
656        let region_id = RegionId::new(1024, 1);
657        let task = RegionMigrationProcedureTask {
658            region_id,
659            from_peer: Peer::empty(1),
660            to_peer: Peer::empty(1),
661            timeout: Duration::from_millis(1000),
662            trigger_reason: RegionMigrationTriggerReason::Manual,
663        };
664
665        let err = manager.submit_procedure(task).await.unwrap_err();
666        assert_matches!(err, error::Error::InvalidArguments { .. });
667    }
668
669    #[tokio::test]
670    async fn test_submit_procedure_table_not_found() {
671        let env = TestingEnv::new();
672        let context_factory = env.context_factory();
673        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
674        let region_id = RegionId::new(1024, 1);
675        let task = RegionMigrationProcedureTask {
676            region_id,
677            from_peer: Peer::empty(1),
678            to_peer: Peer::empty(2),
679            timeout: Duration::from_millis(1000),
680            trigger_reason: RegionMigrationTriggerReason::Manual,
681        };
682
683        let err = manager.submit_procedure(task).await.unwrap_err();
684        assert_matches!(err, error::Error::TableRouteNotFound { .. });
685    }
686
687    #[tokio::test]
688    async fn test_submit_procedure_region_route_not_found() {
689        let env = TestingEnv::new();
690        let context_factory = env.context_factory();
691        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
692        let region_id = RegionId::new(1024, 1);
693        let task = RegionMigrationProcedureTask {
694            region_id,
695            from_peer: Peer::empty(1),
696            to_peer: Peer::empty(2),
697            timeout: Duration::from_millis(1000),
698            trigger_reason: RegionMigrationTriggerReason::Manual,
699        };
700
701        let table_info = new_test_table_info(1024, vec![1]).into();
702        let region_routes = vec![RegionRoute {
703            region: Region::new_test(RegionId::new(1024, 2)),
704            leader_peer: Some(Peer::empty(3)),
705            ..Default::default()
706        }];
707
708        env.create_physical_table_metadata(table_info, region_routes)
709            .await;
710
711        let err = manager.submit_procedure(task).await.unwrap_err();
712        assert_matches!(err, error::Error::RegionRouteNotFound { .. });
713    }
714
715    #[tokio::test]
716    async fn test_submit_procedure_incorrect_from_peer() {
717        let env = TestingEnv::new();
718        let context_factory = env.context_factory();
719        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
720        let region_id = RegionId::new(1024, 1);
721        let task = RegionMigrationProcedureTask {
722            region_id,
723            from_peer: Peer::empty(1),
724            to_peer: Peer::empty(2),
725            timeout: Duration::from_millis(1000),
726            trigger_reason: RegionMigrationTriggerReason::Manual,
727        };
728
729        let table_info = new_test_table_info(1024, vec![1]).into();
730        let region_routes = vec![RegionRoute {
731            region: Region::new_test(RegionId::new(1024, 1)),
732            leader_peer: Some(Peer::empty(3)),
733            ..Default::default()
734        }];
735
736        env.create_physical_table_metadata(table_info, region_routes)
737            .await;
738
739        let err = manager.submit_procedure(task).await.unwrap_err();
740        assert_matches!(err, error::Error::LeaderPeerChanged { .. });
741        assert_eq!(
742            err.to_string(),
743            "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"
744        );
745    }
746
747    #[tokio::test]
748    async fn test_submit_procedure_region_follower_on_to_peer() {
749        let env = TestingEnv::new();
750        let context_factory = env.context_factory();
751        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
752        let region_id = RegionId::new(1024, 1);
753        let task = RegionMigrationProcedureTask {
754            region_id,
755            from_peer: Peer::empty(3),
756            to_peer: Peer::empty(2),
757            timeout: Duration::from_millis(1000),
758            trigger_reason: RegionMigrationTriggerReason::Manual,
759        };
760
761        let table_info = new_test_table_info(1024, vec![1]).into();
762        let region_routes = vec![RegionRoute {
763            region: Region::new_test(region_id),
764            leader_peer: Some(Peer::empty(3)),
765            follower_peers: vec![Peer::empty(2)],
766            ..Default::default()
767        }];
768
769        env.create_physical_table_metadata(table_info, region_routes)
770            .await;
771
772        let err = manager.submit_procedure(task).await.unwrap_err();
773        assert_matches!(err, error::Error::InvalidArguments { .. });
774        assert_eq!(
775            err.to_string(),
776            "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
777        );
778    }
779
780    #[tokio::test]
781    async fn test_submit_procedure_has_migrated() {
782        common_telemetry::init_default_ut_logging();
783        let env = TestingEnv::new();
784        let context_factory = env.context_factory();
785        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
786        let region_id = RegionId::new(1024, 1);
787        let task = RegionMigrationProcedureTask {
788            region_id,
789            from_peer: Peer::empty(1),
790            to_peer: Peer::empty(2),
791            timeout: Duration::from_millis(1000),
792            trigger_reason: RegionMigrationTriggerReason::Manual,
793        };
794
795        let table_info = new_test_table_info(1024, vec![1]).into();
796        let region_routes = vec![RegionRoute {
797            region: Region::new_test(RegionId::new(1024, 1)),
798            leader_peer: Some(Peer::empty(2)),
799            ..Default::default()
800        }];
801
802        env.create_physical_table_metadata(table_info, region_routes)
803            .await;
804
805        let err = manager.submit_procedure(task).await.unwrap_err();
806        assert_matches!(err, error::Error::RegionMigrated { .. });
807    }
808
809    #[tokio::test]
810    async fn test_verify_table_route_error() {
811        let env = TestingEnv::new();
812        let context_factory = env.context_factory();
813        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
814        let region_id = RegionId::new(1024, 1);
815        let task = RegionMigrationProcedureTask {
816            region_id,
817            from_peer: Peer::empty(1),
818            to_peer: Peer::empty(2),
819            timeout: Duration::from_millis(1000),
820            trigger_reason: RegionMigrationTriggerReason::Manual,
821        };
822
823        let err = manager
824            .verify_table_route(
825                &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
826                &task,
827            )
828            .unwrap_err();
829
830        assert_matches!(err, error::Error::Unexpected { .. });
831    }
832
833    #[tokio::test]
834    async fn test_submit_procedure_with_multiple_regions_invalid_task() {
835        let env = TestingEnv::new();
836        let context_factory = env.context_factory();
837        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
838        let task = RegionMigrationTaskBatch {
839            region_ids: vec![RegionId::new(1024, 1)],
840            from_peer: Peer::empty(1),
841            to_peer: Peer::empty(1),
842            timeout: Duration::from_millis(1000),
843            trigger_reason: RegionMigrationTriggerReason::Manual,
844        };
845
846        let err = manager
847            .submit_region_migration_task(task)
848            .await
849            .unwrap_err();
850        assert_matches!(err, error::Error::InvalidArguments { .. });
851    }
852
853    #[tokio::test]
854    async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() {
855        common_telemetry::init_default_ut_logging();
856        let env = TestingEnv::new();
857        let context_factory = env.context_factory();
858        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
859        let region_id = RegionId::new(1024, 1);
860        let task = RegionMigrationTaskBatch {
861            region_ids: vec![region_id],
862            from_peer: Peer::empty(1),
863            to_peer: Peer::empty(2),
864            timeout: Duration::from_millis(1000),
865            trigger_reason: RegionMigrationTriggerReason::Manual,
866        };
867        let table_info = new_test_table_info(1024, vec![1]).into();
868        let region_routes = vec![RegionRoute {
869            region: Region::new_test(region_id),
870            leader_peer: Some(Peer::empty(2)),
871            ..Default::default()
872        }];
873        env.create_physical_table_metadata(table_info, region_routes)
874            .await;
875        let result = manager.submit_region_migration_task(task).await.unwrap();
876
877        assert_eq!(
878            result,
879            SubmitRegionMigrationTaskResult {
880                migrated: vec![region_id],
881                ..Default::default()
882            }
883        );
884    }
885
886    #[tokio::test]
887    async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() {
888        let env = TestingEnv::new();
889        let context_factory = env.context_factory();
890        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
891        let region_id = RegionId::new(1024, 1);
892        let task = RegionMigrationTaskBatch {
893            region_ids: vec![region_id],
894            from_peer: Peer::empty(1),
895            to_peer: Peer::empty(2),
896            timeout: Duration::from_millis(1000),
897            trigger_reason: RegionMigrationTriggerReason::Manual,
898        };
899
900        let table_info = new_test_table_info(1024, vec![1]).into();
901        let region_routes = vec![RegionRoute {
902            region: Region::new_test(RegionId::new(1024, 1)),
903            leader_peer: Some(Peer::empty(3)),
904            ..Default::default()
905        }];
906
907        env.create_physical_table_metadata(table_info, region_routes)
908            .await;
909        let result = manager.submit_region_migration_task(task).await.unwrap();
910        assert_eq!(
911            result,
912            SubmitRegionMigrationTaskResult {
913                leader_changed: vec![region_id],
914                ..Default::default()
915            }
916        );
917    }
918
919    #[tokio::test]
920    async fn test_submit_procedure_with_multiple_regions_peer_conflict() {
921        let env = TestingEnv::new();
922        let context_factory = env.context_factory();
923        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
924        let region_id = RegionId::new(1024, 1);
925        let task = RegionMigrationTaskBatch {
926            region_ids: vec![region_id],
927            from_peer: Peer::empty(3),
928            to_peer: Peer::empty(2),
929            timeout: Duration::from_millis(1000),
930            trigger_reason: RegionMigrationTriggerReason::Manual,
931        };
932
933        let table_info = new_test_table_info(1024, vec![1]).into();
934        let region_routes = vec![RegionRoute {
935            region: Region::new_test(region_id),
936            leader_peer: Some(Peer::empty(3)),
937            follower_peers: vec![Peer::empty(2)],
938            ..Default::default()
939        }];
940
941        env.create_physical_table_metadata(table_info, region_routes)
942            .await;
943        let result = manager.submit_region_migration_task(task).await.unwrap();
944        assert_eq!(
945            result,
946            SubmitRegionMigrationTaskResult {
947                peer_conflict: vec![region_id],
948                ..Default::default()
949            }
950        );
951    }
952
953    #[tokio::test]
954    async fn test_running_regions() {
955        let env = TestingEnv::new();
956        let context_factory = env.context_factory();
957        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
958        let region_id = RegionId::new(1024, 1);
959        let task = RegionMigrationTaskBatch {
960            region_ids: vec![region_id, RegionId::new(1024, 2)],
961            from_peer: Peer::empty(1),
962            to_peer: Peer::empty(2),
963            timeout: Duration::from_millis(1000),
964            trigger_reason: RegionMigrationTriggerReason::Manual,
965        };
966        // Inserts one
967        manager.tracker.running_procedures.write().unwrap().insert(
968            region_id,
969            RegionMigrationProcedureTask::new(
970                region_id,
971                task.from_peer.clone(),
972                task.to_peer.clone(),
973                task.timeout,
974                task.trigger_reason,
975            ),
976        );
977        let table_info = new_test_table_info(1024, vec![1]).into();
978        let region_routes = vec![RegionRoute {
979            region: Region::new_test(RegionId::new(1024, 2)),
980            leader_peer: Some(Peer::empty(1)),
981            ..Default::default()
982        }];
983        env.create_physical_table_metadata(table_info, region_routes)
984            .await;
985        let result = manager.submit_region_migration_task(task).await.unwrap();
986        assert_eq!(result.migrating, vec![region_id]);
987        assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]);
988        assert!(result.procedure_id.is_some());
989    }
990}