meta_srv/procedure/region_migration/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::utils::{
35    RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
36};
37use crate::procedure::region_migration::{
38    DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
39};
40
41pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
42
43/// Manager of region migration procedure.
44pub struct RegionMigrationManager {
45    procedure_manager: ProcedureManagerRef,
46    context_factory: DefaultContextFactory,
47    tracker: RegionMigrationProcedureTracker,
48}
49
50#[derive(Default, Clone)]
51pub struct RegionMigrationProcedureTracker {
52    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
53}
54
55impl RegionMigrationProcedureTracker {
56    /// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating.
57    pub(crate) fn insert_running_procedure(
58        &self,
59        task: &RegionMigrationProcedureTask,
60    ) -> Option<RegionMigrationProcedureGuard> {
61        let mut procedures = self.running_procedures.write().unwrap();
62        match procedures.entry(task.region_id) {
63            Entry::Occupied(_) => None,
64            Entry::Vacant(v) => {
65                v.insert(task.clone());
66                Some(RegionMigrationProcedureGuard {
67                    region_id: task.region_id,
68                    running_procedures: self.running_procedures.clone(),
69                })
70            }
71        }
72    }
73
74    /// Returns true if it contains the specific region(`region_id`).
75    pub(crate) fn contains(&self, region_id: RegionId) -> bool {
76        self.running_procedures
77            .read()
78            .unwrap()
79            .contains_key(&region_id)
80    }
81}
82
83/// The guard of running [RegionMigrationProcedureTask].
84pub(crate) struct RegionMigrationProcedureGuard {
85    region_id: RegionId,
86    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
87}
88
89impl Drop for RegionMigrationProcedureGuard {
90    fn drop(&mut self) {
91        let exists = self
92            .running_procedures
93            .read()
94            .unwrap()
95            .contains_key(&self.region_id);
96        if exists {
97            self.running_procedures
98                .write()
99                .unwrap()
100                .remove(&self.region_id);
101        }
102    }
103}
104
105/// A task of region migration procedure.
106#[derive(Debug, Clone)]
107pub struct RegionMigrationProcedureTask {
108    pub(crate) region_id: RegionId,
109    pub(crate) from_peer: Peer,
110    pub(crate) to_peer: Peer,
111    pub(crate) timeout: Duration,
112    pub(crate) trigger_reason: RegionMigrationTriggerReason,
113}
114
115/// The reason why the region migration procedure is triggered.
116#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
117#[strum(serialize_all = "PascalCase")]
118pub enum RegionMigrationTriggerReason {
119    #[default]
120    /// The region migration procedure is triggered by unknown reason.
121    Unknown,
122    /// The region migration procedure is triggered by administrator.
123    Manual,
124    /// The region migration procedure is triggered by auto rebalance.
125    AutoRebalance,
126    /// The region migration procedure is triggered by failover.
127    Failover,
128}
129
130impl RegionMigrationProcedureTask {
131    pub fn new(
132        region_id: RegionId,
133        from_peer: Peer,
134        to_peer: Peer,
135        timeout: Duration,
136        trigger_reason: RegionMigrationTriggerReason,
137    ) -> Self {
138        Self {
139            region_id,
140            from_peer,
141            to_peer,
142            timeout,
143            trigger_reason,
144        }
145    }
146}
147
148impl Display for RegionMigrationProcedureTask {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        write!(
151            f,
152            "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
153            self.region_id, self.from_peer, self.to_peer, self.trigger_reason
154        )
155    }
156}
157
158/// The result of submitting a region migration task.
159#[derive(Debug, Default, PartialEq, Eq)]
160pub struct SubmitRegionMigrationTaskResult {
161    /// Regions already migrated to the `to_peer`.
162    pub migrated: Vec<RegionId>,
163    /// Regions where the leader peer has changed.
164    pub leader_changed: Vec<RegionId>,
165    /// Regions where `to_peer` is already a follower (conflict).
166    pub peer_conflict: Vec<RegionId>,
167    /// Regions whose table is not found.
168    pub table_not_found: Vec<RegionId>,
169    /// Regions whose table exists but region route is not found (e.g., removed after repartition).
170    pub region_not_found: Vec<RegionId>,
171    /// Regions still pending migration.
172    pub migrating: Vec<RegionId>,
173    /// Regions that have been submitted for migration.
174    pub submitted: Vec<RegionId>,
175    /// The procedure id of the region migration procedure.
176    pub procedure_id: Option<ProcedureId>,
177}
178
179impl RegionMigrationManager {
180    /// Returns new [`RegionMigrationManager`]
181    pub(crate) fn new(
182        procedure_manager: ProcedureManagerRef,
183        context_factory: DefaultContextFactory,
184    ) -> Self {
185        Self {
186            procedure_manager,
187            context_factory,
188            tracker: RegionMigrationProcedureTracker::default(),
189        }
190    }
191
192    /// Returns the [`RegionMigrationProcedureTracker`].
193    pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
194        &self.tracker
195    }
196
197    /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
198    pub(crate) fn try_start(&self) -> Result<()> {
199        let context_factory = self.context_factory.clone();
200        let tracker = self.tracker.clone();
201        self.procedure_manager
202            .register_loader(
203                RegionMigrationProcedure::TYPE_NAME,
204                Box::new(move |json| {
205                    let context_factory = context_factory.clone();
206                    let tracker = tracker.clone();
207                    RegionMigrationProcedure::from_json(json, context_factory, tracker)
208                        .map(|p| Box::new(p) as _)
209                }),
210            )
211            .context(error::RegisterProcedureLoaderSnafu {
212                type_name: RegionMigrationProcedure::TYPE_NAME,
213            })
214    }
215
216    fn insert_running_procedure(
217        &self,
218        task: &RegionMigrationProcedureTask,
219    ) -> Option<RegionMigrationProcedureGuard> {
220        self.tracker.insert_running_procedure(task)
221    }
222
223    fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
224        if task.to_peer.id == task.from_peer.id {
225            return error::InvalidArgumentsSnafu {
226                err_msg: "The `from_peer_id` can't equal `to_peer_id`",
227            }
228            .fail();
229        }
230
231        Ok(())
232    }
233
234    async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
235        let table_route = self
236            .context_factory
237            .table_metadata_manager
238            .table_route_manager()
239            .table_route_storage()
240            .get(region_id.table_id())
241            .await
242            .context(error::TableMetadataManagerSnafu)?
243            .context(error::TableRouteNotFoundSnafu {
244                table_id: region_id.table_id(),
245            })?;
246
247        Ok(table_route)
248    }
249
250    async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
251        let table_route = self
252            .context_factory
253            .table_metadata_manager
254            .table_info_manager()
255            .get(region_id.table_id())
256            .await
257            .context(error::TableMetadataManagerSnafu)?
258            .context(error::TableInfoNotFoundSnafu {
259                table_id: region_id.table_id(),
260            })?
261            .into_inner();
262
263        Ok(table_route)
264    }
265
266    /// Verifies the type of region migration table route.
267    fn verify_table_route(
268        &self,
269        table_route: &TableRouteValue,
270        task: &RegionMigrationProcedureTask,
271    ) -> Result<()> {
272        if !table_route.is_physical() {
273            return error::UnexpectedSnafu {
274                violated: format!(
275                    "Trying to execute region migration on the logical table, task {task}"
276                ),
277            }
278            .fail();
279        }
280
281        Ok(())
282    }
283
284    /// Returns true if the region has been migrated.
285    fn has_migrated(
286        &self,
287        region_route: &RegionRoute,
288        task: &RegionMigrationProcedureTask,
289    ) -> Result<bool> {
290        if region_route.is_leader_downgrading() {
291            return Ok(false);
292        }
293
294        let leader_peer = region_route
295            .leader_peer
296            .as_ref()
297            .context(error::UnexpectedSnafu {
298                violated: "Region route leader peer is not found",
299            })?;
300
301        Ok(leader_peer.id == task.to_peer.id)
302    }
303
304    /// Throws an error if `leader_peer` is not the `from_peer`.
305    ///
306    /// If `from_peer` is unknown, use the leader peer as the `from_peer`.
307    fn verify_region_leader_peer(
308        &self,
309        region_route: &RegionRoute,
310        task: &mut RegionMigrationProcedureTask,
311    ) -> Result<()> {
312        let leader_peer = region_route
313            .leader_peer
314            .as_ref()
315            .context(error::UnexpectedSnafu {
316                violated: "Region route leader peer is not found",
317            })?;
318
319        ensure!(
320            leader_peer.id == task.from_peer.id,
321            error::LeaderPeerChangedSnafu {
322                msg: format!(
323                    "Region's leader peer({}) is not the `from_peer`({}), region: {}",
324                    leader_peer.id, task.from_peer.id, task.region_id
325                ),
326            }
327        );
328
329        if task.from_peer.addr.is_empty() {
330            warn!(
331                "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
332                leader_peer, task.region_id
333            );
334            // The peer id is the same as the leader peer id.
335            task.from_peer = leader_peer.clone();
336        }
337
338        Ok(())
339    }
340
341    /// Throws an error if `to_peer` is already has a region follower.
342    fn verify_region_follower_peers(
343        &self,
344        region_route: &RegionRoute,
345        task: &RegionMigrationProcedureTask,
346    ) -> Result<()> {
347        ensure!(
348            !region_route.follower_peers.contains(&task.to_peer),
349            error::InvalidArgumentsSnafu {
350                err_msg: format!(
351                    "The `to_peer`({}) is already has a region follower, region: {}",
352                    task.to_peer.id, task.region_id
353                ),
354            },
355        );
356
357        Ok(())
358    }
359
360    /// Extracts regions from the migration task that are already running migration procedures.
361    ///
362    /// Returns a tuple containing those region ids that are already running and the newly created procedure guards.
363    /// The regions that are already running will be removed from the [`RegionMigrationTask`].
364    fn extract_running_regions(
365        &self,
366        task: &mut RegionMigrationTaskBatch,
367    ) -> (Vec<RegionId>, Vec<RegionMigrationProcedureGuard>) {
368        let mut migrating_region_ids = Vec::new();
369        let mut procedure_guards = Vec::with_capacity(task.region_ids.len());
370
371        for region_id in &task.region_ids {
372            let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new(
373                *region_id,
374                task.from_peer.clone(),
375                task.to_peer.clone(),
376                task.timeout,
377                task.trigger_reason,
378            )) else {
379                migrating_region_ids.push(*region_id);
380                continue;
381            };
382            procedure_guards.push(guard);
383        }
384
385        let migrating_set = migrating_region_ids.iter().cloned().collect::<HashSet<_>>();
386        task.region_ids.retain(|id| !migrating_set.contains(id));
387
388        (migrating_region_ids, procedure_guards)
389    }
390
391    pub async fn submit_region_migration_task(
392        &self,
393        mut task: RegionMigrationTaskBatch,
394    ) -> Result<SubmitRegionMigrationTaskResult> {
395        let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task);
396        let RegionMigrationAnalysis {
397            migrated,
398            leader_changed,
399            peer_conflict,
400            mut table_not_found,
401            region_not_found,
402            pending,
403        } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager)
404            .await?;
405        if pending.is_empty() {
406            return Ok(SubmitRegionMigrationTaskResult {
407                migrated,
408                leader_changed,
409                peer_conflict,
410                table_not_found,
411                region_not_found,
412                migrating: migrating_region_ids,
413                submitted: vec![],
414                procedure_id: None,
415            });
416        }
417
418        // Updates the region ids to the pending region ids.
419        task.region_ids = pending;
420        let table_regions = task.table_regions();
421        let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
422        let table_info_values = self
423            .context_factory
424            .table_metadata_manager
425            .table_info_manager()
426            .batch_get(&table_ids)
427            .await
428            .context(error::TableMetadataManagerSnafu)?;
429        let mut catalog_and_schema = Vec::with_capacity(table_info_values.len());
430        for (table_id, regions) in table_regions {
431            match table_info_values.get(&table_id) {
432                Some(table_info) => {
433                    let TableName {
434                        catalog_name,
435                        schema_name,
436                        ..
437                    } = table_info.table_name();
438                    catalog_and_schema.push((catalog_name, schema_name));
439                }
440                None => {
441                    task.region_ids.retain(|id| id.table_id() != table_id);
442                    table_not_found.extend(regions);
443                }
444            }
445        }
446        if task.region_ids.is_empty() {
447            return Ok(SubmitRegionMigrationTaskResult {
448                migrated,
449                leader_changed,
450                peer_conflict,
451                table_not_found,
452                region_not_found,
453                migrating: migrating_region_ids,
454                submitted: vec![],
455                procedure_id: None,
456            });
457        }
458
459        let submitting_region_ids = task.region_ids.clone();
460        let procedure_id = self
461            .submit_procedure_inner(task, procedure_guards, catalog_and_schema)
462            .await?;
463        Ok(SubmitRegionMigrationTaskResult {
464            migrated,
465            leader_changed,
466            peer_conflict,
467            table_not_found,
468            region_not_found,
469            migrating: migrating_region_ids,
470            submitted: submitting_region_ids,
471            procedure_id: Some(procedure_id),
472        })
473    }
474
475    async fn submit_procedure_inner(
476        &self,
477        task: RegionMigrationTaskBatch,
478        procedure_guards: Vec<RegionMigrationProcedureGuard>,
479        catalog_and_schema: Vec<(String, String)>,
480    ) -> Result<ProcedureId> {
481        let procedure = RegionMigrationProcedure::new(
482            PersistentContext::new(
483                catalog_and_schema,
484                task.from_peer.clone(),
485                task.to_peer.clone(),
486                task.region_ids.clone(),
487                task.timeout,
488                task.trigger_reason,
489            ),
490            self.context_factory.clone(),
491            procedure_guards,
492        );
493        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
494        let procedure_id = procedure_with_id.id;
495        info!("Starting region migration procedure {procedure_id} for {task}");
496        let procedure_manager = self.procedure_manager.clone();
497        let num_region = task.region_ids.len();
498
499        common_runtime::spawn_global(async move {
500            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
501                Ok(watcher) => watcher,
502                Err(e) => {
503                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
504                    return;
505                }
506            };
507            METRIC_META_REGION_MIGRATION_DATANODES
508                .with_label_values(&["src", &task.from_peer.id.to_string()])
509                .inc_by(num_region as u64);
510            METRIC_META_REGION_MIGRATION_DATANODES
511                .with_label_values(&["desc", &task.to_peer.id.to_string()])
512                .inc_by(num_region as u64);
513
514            if let Err(e) = watcher::wait(watcher).await {
515                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
516                METRIC_META_REGION_MIGRATION_FAIL.inc();
517                return;
518            }
519
520            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
521        });
522
523        Ok(procedure_id)
524    }
525
526    /// Submits a new region migration procedure.
527    pub async fn submit_procedure(
528        &self,
529        mut task: RegionMigrationProcedureTask,
530    ) -> Result<Option<ProcedureId>> {
531        let Some(guard) = self.insert_running_procedure(&task) else {
532            return error::MigrationRunningSnafu {
533                region_id: task.region_id,
534            }
535            .fail();
536        };
537
538        self.verify_task(&task)?;
539
540        let region_id = task.region_id;
541
542        let table_route = self.retrieve_table_route(region_id).await?;
543        self.verify_table_route(&table_route, &task)?;
544
545        // Safety: checked before.
546        let region_route = table_route
547            .region_route(region_id)
548            .context(error::UnexpectedLogicalRouteTableSnafu {
549                err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
550            })?
551            .context(error::RegionRouteNotFoundSnafu { region_id })?;
552
553        if self.has_migrated(&region_route, &task)? {
554            info!("Skipping region migration task: {task}");
555            return error::RegionMigratedSnafu {
556                region_id,
557                target_peer_id: task.to_peer.id,
558            }
559            .fail();
560        }
561
562        self.verify_region_leader_peer(&region_route, &mut task)?;
563        self.verify_region_follower_peers(&region_route, &task)?;
564        let table_info = self.retrieve_table_info(region_id).await?;
565        let TableName {
566            catalog_name,
567            schema_name,
568            ..
569        } = table_info.table_name();
570        let RegionMigrationProcedureTask {
571            region_id,
572            from_peer,
573            to_peer,
574            timeout,
575            trigger_reason,
576        } = task.clone();
577        let procedure = RegionMigrationProcedure::new(
578            PersistentContext::new(
579                vec![(catalog_name, schema_name)],
580                from_peer,
581                to_peer,
582                vec![region_id],
583                timeout,
584                trigger_reason,
585            ),
586            self.context_factory.clone(),
587            vec![guard],
588        );
589        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
590        let procedure_id = procedure_with_id.id;
591        info!("Starting region migration procedure {procedure_id} for {task}");
592        let procedure_manager = self.procedure_manager.clone();
593        common_runtime::spawn_global(async move {
594            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
595                Ok(watcher) => watcher,
596                Err(e) => {
597                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
598                    return;
599                }
600            };
601            METRIC_META_REGION_MIGRATION_DATANODES
602                .with_label_values(&["src", &task.from_peer.id.to_string()])
603                .inc();
604            METRIC_META_REGION_MIGRATION_DATANODES
605                .with_label_values(&["desc", &task.to_peer.id.to_string()])
606                .inc();
607
608            if let Err(e) = watcher::wait(watcher).await {
609                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
610                METRIC_META_REGION_MIGRATION_FAIL.inc();
611                return;
612            }
613
614            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
615        });
616
617        Ok(Some(procedure_id))
618    }
619}
620
621#[cfg(test)]
622mod test {
623    use std::assert_matches::assert_matches;
624
625    use common_meta::key::table_route::LogicalTableRouteValue;
626    use common_meta::key::test_utils::new_test_table_info;
627    use common_meta::rpc::router::Region;
628
629    use super::*;
630    use crate::procedure::region_migration::test_util::TestingEnv;
631
632    #[tokio::test]
633    async fn test_insert_running_procedure() {
634        let env = TestingEnv::new();
635        let context_factory = env.context_factory();
636        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
637        let region_id = RegionId::new(1024, 1);
638        let task = RegionMigrationProcedureTask {
639            region_id,
640            from_peer: Peer::empty(2),
641            to_peer: Peer::empty(1),
642            timeout: Duration::from_millis(1000),
643            trigger_reason: RegionMigrationTriggerReason::Manual,
644        };
645        // Inserts one
646        manager
647            .tracker
648            .running_procedures
649            .write()
650            .unwrap()
651            .insert(region_id, task.clone());
652
653        let err = manager.submit_procedure(task).await.unwrap_err();
654        assert_matches!(err, error::Error::MigrationRunning { .. });
655    }
656
657    #[tokio::test]
658    async fn test_submit_procedure_invalid_task() {
659        let env = TestingEnv::new();
660        let context_factory = env.context_factory();
661        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
662        let region_id = RegionId::new(1024, 1);
663        let task = RegionMigrationProcedureTask {
664            region_id,
665            from_peer: Peer::empty(1),
666            to_peer: Peer::empty(1),
667            timeout: Duration::from_millis(1000),
668            trigger_reason: RegionMigrationTriggerReason::Manual,
669        };
670
671        let err = manager.submit_procedure(task).await.unwrap_err();
672        assert_matches!(err, error::Error::InvalidArguments { .. });
673    }
674
675    #[tokio::test]
676    async fn test_submit_procedure_table_not_found() {
677        let env = TestingEnv::new();
678        let context_factory = env.context_factory();
679        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
680        let region_id = RegionId::new(1024, 1);
681        let task = RegionMigrationProcedureTask {
682            region_id,
683            from_peer: Peer::empty(1),
684            to_peer: Peer::empty(2),
685            timeout: Duration::from_millis(1000),
686            trigger_reason: RegionMigrationTriggerReason::Manual,
687        };
688
689        let err = manager.submit_procedure(task).await.unwrap_err();
690        assert_matches!(err, error::Error::TableRouteNotFound { .. });
691    }
692
693    #[tokio::test]
694    async fn test_submit_procedure_region_route_not_found() {
695        let env = TestingEnv::new();
696        let context_factory = env.context_factory();
697        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
698        let region_id = RegionId::new(1024, 1);
699        let task = RegionMigrationProcedureTask {
700            region_id,
701            from_peer: Peer::empty(1),
702            to_peer: Peer::empty(2),
703            timeout: Duration::from_millis(1000),
704            trigger_reason: RegionMigrationTriggerReason::Manual,
705        };
706
707        let table_info = new_test_table_info(1024).into();
708        let region_routes = vec![RegionRoute {
709            region: Region::new_test(RegionId::new(1024, 2)),
710            leader_peer: Some(Peer::empty(3)),
711            ..Default::default()
712        }];
713
714        env.create_physical_table_metadata(table_info, region_routes)
715            .await;
716
717        let err = manager.submit_procedure(task).await.unwrap_err();
718        assert_matches!(err, error::Error::RegionRouteNotFound { .. });
719    }
720
721    #[tokio::test]
722    async fn test_submit_procedure_incorrect_from_peer() {
723        let env = TestingEnv::new();
724        let context_factory = env.context_factory();
725        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
726        let region_id = RegionId::new(1024, 1);
727        let task = RegionMigrationProcedureTask {
728            region_id,
729            from_peer: Peer::empty(1),
730            to_peer: Peer::empty(2),
731            timeout: Duration::from_millis(1000),
732            trigger_reason: RegionMigrationTriggerReason::Manual,
733        };
734
735        let table_info = new_test_table_info(1024).into();
736        let region_routes = vec![RegionRoute {
737            region: Region::new_test(RegionId::new(1024, 1)),
738            leader_peer: Some(Peer::empty(3)),
739            ..Default::default()
740        }];
741
742        env.create_physical_table_metadata(table_info, region_routes)
743            .await;
744
745        let err = manager.submit_procedure(task).await.unwrap_err();
746        assert_matches!(err, error::Error::LeaderPeerChanged { .. });
747        assert_eq!(
748            err.to_string(),
749            "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"
750        );
751    }
752
753    #[tokio::test]
754    async fn test_submit_procedure_region_follower_on_to_peer() {
755        let env = TestingEnv::new();
756        let context_factory = env.context_factory();
757        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
758        let region_id = RegionId::new(1024, 1);
759        let task = RegionMigrationProcedureTask {
760            region_id,
761            from_peer: Peer::empty(3),
762            to_peer: Peer::empty(2),
763            timeout: Duration::from_millis(1000),
764            trigger_reason: RegionMigrationTriggerReason::Manual,
765        };
766
767        let table_info = new_test_table_info(1024).into();
768        let region_routes = vec![RegionRoute {
769            region: Region::new_test(region_id),
770            leader_peer: Some(Peer::empty(3)),
771            follower_peers: vec![Peer::empty(2)],
772            ..Default::default()
773        }];
774
775        env.create_physical_table_metadata(table_info, region_routes)
776            .await;
777
778        let err = manager.submit_procedure(task).await.unwrap_err();
779        assert_matches!(err, error::Error::InvalidArguments { .. });
780        assert_eq!(
781            err.to_string(),
782            "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
783        );
784    }
785
786    #[tokio::test]
787    async fn test_submit_procedure_has_migrated() {
788        common_telemetry::init_default_ut_logging();
789        let env = TestingEnv::new();
790        let context_factory = env.context_factory();
791        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
792        let region_id = RegionId::new(1024, 1);
793        let task = RegionMigrationProcedureTask {
794            region_id,
795            from_peer: Peer::empty(1),
796            to_peer: Peer::empty(2),
797            timeout: Duration::from_millis(1000),
798            trigger_reason: RegionMigrationTriggerReason::Manual,
799        };
800
801        let table_info = new_test_table_info(1024).into();
802        let region_routes = vec![RegionRoute {
803            region: Region::new_test(RegionId::new(1024, 1)),
804            leader_peer: Some(Peer::empty(2)),
805            ..Default::default()
806        }];
807
808        env.create_physical_table_metadata(table_info, region_routes)
809            .await;
810
811        let err = manager.submit_procedure(task).await.unwrap_err();
812        assert_matches!(err, error::Error::RegionMigrated { .. });
813    }
814
815    #[tokio::test]
816    async fn test_verify_table_route_error() {
817        let env = TestingEnv::new();
818        let context_factory = env.context_factory();
819        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
820        let region_id = RegionId::new(1024, 1);
821        let task = RegionMigrationProcedureTask {
822            region_id,
823            from_peer: Peer::empty(1),
824            to_peer: Peer::empty(2),
825            timeout: Duration::from_millis(1000),
826            trigger_reason: RegionMigrationTriggerReason::Manual,
827        };
828
829        let err = manager
830            .verify_table_route(
831                &TableRouteValue::Logical(LogicalTableRouteValue::new(0)),
832                &task,
833            )
834            .unwrap_err();
835
836        assert_matches!(err, error::Error::Unexpected { .. });
837    }
838
839    #[tokio::test]
840    async fn test_submit_procedure_with_multiple_regions_invalid_task() {
841        let env = TestingEnv::new();
842        let context_factory = env.context_factory();
843        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
844        let task = RegionMigrationTaskBatch {
845            region_ids: vec![RegionId::new(1024, 1)],
846            from_peer: Peer::empty(1),
847            to_peer: Peer::empty(1),
848            timeout: Duration::from_millis(1000),
849            trigger_reason: RegionMigrationTriggerReason::Manual,
850        };
851
852        let err = manager
853            .submit_region_migration_task(task)
854            .await
855            .unwrap_err();
856        assert_matches!(err, error::Error::InvalidArguments { .. });
857    }
858
859    #[tokio::test]
860    async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() {
861        common_telemetry::init_default_ut_logging();
862        let env = TestingEnv::new();
863        let context_factory = env.context_factory();
864        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
865        let region_id = RegionId::new(1024, 1);
866        let task = RegionMigrationTaskBatch {
867            region_ids: vec![region_id],
868            from_peer: Peer::empty(1),
869            to_peer: Peer::empty(2),
870            timeout: Duration::from_millis(1000),
871            trigger_reason: RegionMigrationTriggerReason::Manual,
872        };
873        let table_info = new_test_table_info(1024).into();
874        let region_routes = vec![RegionRoute {
875            region: Region::new_test(region_id),
876            leader_peer: Some(Peer::empty(2)),
877            ..Default::default()
878        }];
879        env.create_physical_table_metadata(table_info, region_routes)
880            .await;
881        let result = manager.submit_region_migration_task(task).await.unwrap();
882
883        assert_eq!(
884            result,
885            SubmitRegionMigrationTaskResult {
886                migrated: vec![region_id],
887                ..Default::default()
888            }
889        );
890    }
891
892    #[tokio::test]
893    async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() {
894        let env = TestingEnv::new();
895        let context_factory = env.context_factory();
896        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
897        let region_id = RegionId::new(1024, 1);
898        let task = RegionMigrationTaskBatch {
899            region_ids: vec![region_id],
900            from_peer: Peer::empty(1),
901            to_peer: Peer::empty(2),
902            timeout: Duration::from_millis(1000),
903            trigger_reason: RegionMigrationTriggerReason::Manual,
904        };
905
906        let table_info = new_test_table_info(1024).into();
907        let region_routes = vec![RegionRoute {
908            region: Region::new_test(RegionId::new(1024, 1)),
909            leader_peer: Some(Peer::empty(3)),
910            ..Default::default()
911        }];
912
913        env.create_physical_table_metadata(table_info, region_routes)
914            .await;
915        let result = manager.submit_region_migration_task(task).await.unwrap();
916        assert_eq!(
917            result,
918            SubmitRegionMigrationTaskResult {
919                leader_changed: vec![region_id],
920                ..Default::default()
921            }
922        );
923    }
924
925    #[tokio::test]
926    async fn test_submit_procedure_with_multiple_regions_peer_conflict() {
927        let env = TestingEnv::new();
928        let context_factory = env.context_factory();
929        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
930        let region_id = RegionId::new(1024, 1);
931        let task = RegionMigrationTaskBatch {
932            region_ids: vec![region_id],
933            from_peer: Peer::empty(3),
934            to_peer: Peer::empty(2),
935            timeout: Duration::from_millis(1000),
936            trigger_reason: RegionMigrationTriggerReason::Manual,
937        };
938
939        let table_info = new_test_table_info(1024).into();
940        let region_routes = vec![RegionRoute {
941            region: Region::new_test(region_id),
942            leader_peer: Some(Peer::empty(3)),
943            follower_peers: vec![Peer::empty(2)],
944            ..Default::default()
945        }];
946
947        env.create_physical_table_metadata(table_info, region_routes)
948            .await;
949        let result = manager.submit_region_migration_task(task).await.unwrap();
950        assert_eq!(
951            result,
952            SubmitRegionMigrationTaskResult {
953                peer_conflict: vec![region_id],
954                ..Default::default()
955            }
956        );
957    }
958
959    #[tokio::test]
960    async fn test_running_regions() {
961        let env = TestingEnv::new();
962        let context_factory = env.context_factory();
963        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
964        let region_id = RegionId::new(1024, 1);
965        let task = RegionMigrationTaskBatch {
966            region_ids: vec![region_id, RegionId::new(1024, 2)],
967            from_peer: Peer::empty(1),
968            to_peer: Peer::empty(2),
969            timeout: Duration::from_millis(1000),
970            trigger_reason: RegionMigrationTriggerReason::Manual,
971        };
972        // Inserts one
973        manager.tracker.running_procedures.write().unwrap().insert(
974            region_id,
975            RegionMigrationProcedureTask::new(
976                region_id,
977                task.from_peer.clone(),
978                task.to_peer.clone(),
979                task.timeout,
980                task.trigger_reason,
981            ),
982        );
983        let table_info = new_test_table_info(1024).into();
984        let region_routes = vec![RegionRoute {
985            region: Region::new_test(RegionId::new(1024, 2)),
986            leader_peer: Some(Peer::empty(1)),
987            ..Default::default()
988        }];
989        env.create_physical_table_metadata(table_info, region_routes)
990            .await;
991        let result = manager.submit_region_migration_task(task).await.unwrap();
992        assert_eq!(result.migrating, vec![region_id]);
993        assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]);
994        assert!(result.procedure_id.is_some());
995    }
996}