1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::utils::{
35 RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
36};
37use crate::procedure::region_migration::{
38 DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
39};
40
41pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
42
43pub struct RegionMigrationManager {
45 procedure_manager: ProcedureManagerRef,
46 context_factory: DefaultContextFactory,
47 tracker: RegionMigrationProcedureTracker,
48}
49
50#[derive(Default, Clone)]
51pub struct RegionMigrationProcedureTracker {
52 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
53}
54
55impl RegionMigrationProcedureTracker {
56 pub(crate) fn insert_running_procedure(
58 &self,
59 task: &RegionMigrationProcedureTask,
60 ) -> Option<RegionMigrationProcedureGuard> {
61 let mut procedures = self.running_procedures.write().unwrap();
62 match procedures.entry(task.region_id) {
63 Entry::Occupied(_) => None,
64 Entry::Vacant(v) => {
65 v.insert(task.clone());
66 Some(RegionMigrationProcedureGuard {
67 region_id: task.region_id,
68 running_procedures: self.running_procedures.clone(),
69 })
70 }
71 }
72 }
73
74 pub(crate) fn contains(&self, region_id: RegionId) -> bool {
76 self.running_procedures
77 .read()
78 .unwrap()
79 .contains_key(®ion_id)
80 }
81}
82
83pub(crate) struct RegionMigrationProcedureGuard {
85 region_id: RegionId,
86 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
87}
88
89impl Drop for RegionMigrationProcedureGuard {
90 fn drop(&mut self) {
91 let exists = self
92 .running_procedures
93 .read()
94 .unwrap()
95 .contains_key(&self.region_id);
96 if exists {
97 self.running_procedures
98 .write()
99 .unwrap()
100 .remove(&self.region_id);
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct RegionMigrationProcedureTask {
108 pub(crate) region_id: RegionId,
109 pub(crate) from_peer: Peer,
110 pub(crate) to_peer: Peer,
111 pub(crate) timeout: Duration,
112 pub(crate) trigger_reason: RegionMigrationTriggerReason,
113}
114
115#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
117#[strum(serialize_all = "PascalCase")]
118pub enum RegionMigrationTriggerReason {
119 #[default]
120 Unknown,
122 Manual,
124 AutoRebalance,
126 Failover,
128}
129
130impl RegionMigrationProcedureTask {
131 pub fn new(
132 region_id: RegionId,
133 from_peer: Peer,
134 to_peer: Peer,
135 timeout: Duration,
136 trigger_reason: RegionMigrationTriggerReason,
137 ) -> Self {
138 Self {
139 region_id,
140 from_peer,
141 to_peer,
142 timeout,
143 trigger_reason,
144 }
145 }
146}
147
148impl Display for RegionMigrationProcedureTask {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 write!(
151 f,
152 "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
153 self.region_id, self.from_peer, self.to_peer, self.trigger_reason
154 )
155 }
156}
157
158#[derive(Debug, Default, PartialEq, Eq)]
160pub struct SubmitRegionMigrationTaskResult {
161 pub migrated: Vec<RegionId>,
163 pub leader_changed: Vec<RegionId>,
165 pub peer_conflict: Vec<RegionId>,
167 pub table_not_found: Vec<RegionId>,
169 pub region_not_found: Vec<RegionId>,
171 pub migrating: Vec<RegionId>,
173 pub submitted: Vec<RegionId>,
175 pub procedure_id: Option<ProcedureId>,
177}
178
179impl RegionMigrationManager {
180 pub(crate) fn new(
182 procedure_manager: ProcedureManagerRef,
183 context_factory: DefaultContextFactory,
184 ) -> Self {
185 Self {
186 procedure_manager,
187 context_factory,
188 tracker: RegionMigrationProcedureTracker::default(),
189 }
190 }
191
192 pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
194 &self.tracker
195 }
196
197 pub(crate) fn try_start(&self) -> Result<()> {
199 let context_factory = self.context_factory.clone();
200 let tracker = self.tracker.clone();
201 self.procedure_manager
202 .register_loader(
203 RegionMigrationProcedure::TYPE_NAME,
204 Box::new(move |json| {
205 let context_factory = context_factory.clone();
206 let tracker = tracker.clone();
207 RegionMigrationProcedure::from_json(json, context_factory, tracker)
208 .map(|p| Box::new(p) as _)
209 }),
210 )
211 .context(error::RegisterProcedureLoaderSnafu {
212 type_name: RegionMigrationProcedure::TYPE_NAME,
213 })
214 }
215
216 fn insert_running_procedure(
217 &self,
218 task: &RegionMigrationProcedureTask,
219 ) -> Option<RegionMigrationProcedureGuard> {
220 self.tracker.insert_running_procedure(task)
221 }
222
223 fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
224 if task.to_peer.id == task.from_peer.id {
225 return error::InvalidArgumentsSnafu {
226 err_msg: "The `from_peer_id` can't equal `to_peer_id`",
227 }
228 .fail();
229 }
230
231 Ok(())
232 }
233
234 async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
235 let table_route = self
236 .context_factory
237 .table_metadata_manager
238 .table_route_manager()
239 .table_route_storage()
240 .get(region_id.table_id())
241 .await
242 .context(error::TableMetadataManagerSnafu)?
243 .context(error::TableRouteNotFoundSnafu {
244 table_id: region_id.table_id(),
245 })?;
246
247 Ok(table_route)
248 }
249
250 async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
251 let table_route = self
252 .context_factory
253 .table_metadata_manager
254 .table_info_manager()
255 .get(region_id.table_id())
256 .await
257 .context(error::TableMetadataManagerSnafu)?
258 .context(error::TableInfoNotFoundSnafu {
259 table_id: region_id.table_id(),
260 })?
261 .into_inner();
262
263 Ok(table_route)
264 }
265
266 fn verify_table_route(
268 &self,
269 table_route: &TableRouteValue,
270 task: &RegionMigrationProcedureTask,
271 ) -> Result<()> {
272 if !table_route.is_physical() {
273 return error::UnexpectedSnafu {
274 violated: format!(
275 "Trying to execute region migration on the logical table, task {task}"
276 ),
277 }
278 .fail();
279 }
280
281 Ok(())
282 }
283
284 fn has_migrated(
286 &self,
287 region_route: &RegionRoute,
288 task: &RegionMigrationProcedureTask,
289 ) -> Result<bool> {
290 if region_route.is_leader_downgrading() {
291 return Ok(false);
292 }
293
294 let leader_peer = region_route
295 .leader_peer
296 .as_ref()
297 .context(error::UnexpectedSnafu {
298 violated: "Region route leader peer is not found",
299 })?;
300
301 Ok(leader_peer.id == task.to_peer.id)
302 }
303
304 fn verify_region_leader_peer(
308 &self,
309 region_route: &RegionRoute,
310 task: &mut RegionMigrationProcedureTask,
311 ) -> Result<()> {
312 let leader_peer = region_route
313 .leader_peer
314 .as_ref()
315 .context(error::UnexpectedSnafu {
316 violated: "Region route leader peer is not found",
317 })?;
318
319 ensure!(
320 leader_peer.id == task.from_peer.id,
321 error::LeaderPeerChangedSnafu {
322 msg: format!(
323 "Region's leader peer({}) is not the `from_peer`({}), region: {}",
324 leader_peer.id, task.from_peer.id, task.region_id
325 ),
326 }
327 );
328
329 if task.from_peer.addr.is_empty() {
330 warn!(
331 "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
332 leader_peer, task.region_id
333 );
334 task.from_peer = leader_peer.clone();
336 }
337
338 Ok(())
339 }
340
341 fn verify_region_follower_peers(
343 &self,
344 region_route: &RegionRoute,
345 task: &RegionMigrationProcedureTask,
346 ) -> Result<()> {
347 ensure!(
348 !region_route.follower_peers.contains(&task.to_peer),
349 error::InvalidArgumentsSnafu {
350 err_msg: format!(
351 "The `to_peer`({}) is already has a region follower, region: {}",
352 task.to_peer.id, task.region_id
353 ),
354 },
355 );
356
357 Ok(())
358 }
359
360 fn extract_running_regions(
365 &self,
366 task: &mut RegionMigrationTaskBatch,
367 ) -> (Vec<RegionId>, Vec<RegionMigrationProcedureGuard>) {
368 let mut migrating_region_ids = Vec::new();
369 let mut procedure_guards = Vec::with_capacity(task.region_ids.len());
370
371 for region_id in &task.region_ids {
372 let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new(
373 *region_id,
374 task.from_peer.clone(),
375 task.to_peer.clone(),
376 task.timeout,
377 task.trigger_reason,
378 )) else {
379 migrating_region_ids.push(*region_id);
380 continue;
381 };
382 procedure_guards.push(guard);
383 }
384
385 let migrating_set = migrating_region_ids.iter().cloned().collect::<HashSet<_>>();
386 task.region_ids.retain(|id| !migrating_set.contains(id));
387
388 (migrating_region_ids, procedure_guards)
389 }
390
391 pub async fn submit_region_migration_task(
392 &self,
393 mut task: RegionMigrationTaskBatch,
394 ) -> Result<SubmitRegionMigrationTaskResult> {
395 let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task);
396 let RegionMigrationAnalysis {
397 migrated,
398 leader_changed,
399 peer_conflict,
400 mut table_not_found,
401 region_not_found,
402 pending,
403 } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager)
404 .await?;
405 if pending.is_empty() {
406 return Ok(SubmitRegionMigrationTaskResult {
407 migrated,
408 leader_changed,
409 peer_conflict,
410 table_not_found,
411 region_not_found,
412 migrating: migrating_region_ids,
413 submitted: vec![],
414 procedure_id: None,
415 });
416 }
417
418 task.region_ids = pending;
420 let table_regions = task.table_regions();
421 let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
422 let table_info_values = self
423 .context_factory
424 .table_metadata_manager
425 .table_info_manager()
426 .batch_get(&table_ids)
427 .await
428 .context(error::TableMetadataManagerSnafu)?;
429 let mut catalog_and_schema = Vec::with_capacity(table_info_values.len());
430 for (table_id, regions) in table_regions {
431 match table_info_values.get(&table_id) {
432 Some(table_info) => {
433 let TableName {
434 catalog_name,
435 schema_name,
436 ..
437 } = table_info.table_name();
438 catalog_and_schema.push((catalog_name, schema_name));
439 }
440 None => {
441 task.region_ids.retain(|id| id.table_id() != table_id);
442 table_not_found.extend(regions);
443 }
444 }
445 }
446 if task.region_ids.is_empty() {
447 return Ok(SubmitRegionMigrationTaskResult {
448 migrated,
449 leader_changed,
450 peer_conflict,
451 table_not_found,
452 region_not_found,
453 migrating: migrating_region_ids,
454 submitted: vec![],
455 procedure_id: None,
456 });
457 }
458
459 let submitting_region_ids = task.region_ids.clone();
460 let procedure_id = self
461 .submit_procedure_inner(task, procedure_guards, catalog_and_schema)
462 .await?;
463 Ok(SubmitRegionMigrationTaskResult {
464 migrated,
465 leader_changed,
466 peer_conflict,
467 table_not_found,
468 region_not_found,
469 migrating: migrating_region_ids,
470 submitted: submitting_region_ids,
471 procedure_id: Some(procedure_id),
472 })
473 }
474
475 async fn submit_procedure_inner(
476 &self,
477 task: RegionMigrationTaskBatch,
478 procedure_guards: Vec<RegionMigrationProcedureGuard>,
479 catalog_and_schema: Vec<(String, String)>,
480 ) -> Result<ProcedureId> {
481 let procedure = RegionMigrationProcedure::new(
482 PersistentContext::new(
483 catalog_and_schema,
484 task.from_peer.clone(),
485 task.to_peer.clone(),
486 task.region_ids.clone(),
487 task.timeout,
488 task.trigger_reason,
489 ),
490 self.context_factory.clone(),
491 procedure_guards,
492 );
493 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
494 let procedure_id = procedure_with_id.id;
495 info!("Starting region migration procedure {procedure_id} for {task}");
496 let procedure_manager = self.procedure_manager.clone();
497 let num_region = task.region_ids.len();
498
499 common_runtime::spawn_global(async move {
500 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
501 Ok(watcher) => watcher,
502 Err(e) => {
503 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
504 return;
505 }
506 };
507 METRIC_META_REGION_MIGRATION_DATANODES
508 .with_label_values(&["src", &task.from_peer.id.to_string()])
509 .inc_by(num_region as u64);
510 METRIC_META_REGION_MIGRATION_DATANODES
511 .with_label_values(&["desc", &task.to_peer.id.to_string()])
512 .inc_by(num_region as u64);
513
514 if let Err(e) = watcher::wait(watcher).await {
515 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
516 METRIC_META_REGION_MIGRATION_FAIL.inc();
517 return;
518 }
519
520 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
521 });
522
523 Ok(procedure_id)
524 }
525
526 pub async fn submit_procedure(
528 &self,
529 mut task: RegionMigrationProcedureTask,
530 ) -> Result<Option<ProcedureId>> {
531 let Some(guard) = self.insert_running_procedure(&task) else {
532 return error::MigrationRunningSnafu {
533 region_id: task.region_id,
534 }
535 .fail();
536 };
537
538 self.verify_task(&task)?;
539
540 let region_id = task.region_id;
541
542 let table_route = self.retrieve_table_route(region_id).await?;
543 self.verify_table_route(&table_route, &task)?;
544
545 let region_route = table_route
547 .region_route(region_id)
548 .context(error::UnexpectedLogicalRouteTableSnafu {
549 err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
550 })?
551 .context(error::RegionRouteNotFoundSnafu { region_id })?;
552
553 if self.has_migrated(®ion_route, &task)? {
554 info!("Skipping region migration task: {task}");
555 return error::RegionMigratedSnafu {
556 region_id,
557 target_peer_id: task.to_peer.id,
558 }
559 .fail();
560 }
561
562 self.verify_region_leader_peer(®ion_route, &mut task)?;
563 self.verify_region_follower_peers(®ion_route, &task)?;
564 let table_info = self.retrieve_table_info(region_id).await?;
565 let TableName {
566 catalog_name,
567 schema_name,
568 ..
569 } = table_info.table_name();
570 let RegionMigrationProcedureTask {
571 region_id,
572 from_peer,
573 to_peer,
574 timeout,
575 trigger_reason,
576 } = task.clone();
577 let procedure = RegionMigrationProcedure::new(
578 PersistentContext::new(
579 vec![(catalog_name, schema_name)],
580 from_peer,
581 to_peer,
582 vec![region_id],
583 timeout,
584 trigger_reason,
585 ),
586 self.context_factory.clone(),
587 vec![guard],
588 );
589 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
590 let procedure_id = procedure_with_id.id;
591 info!("Starting region migration procedure {procedure_id} for {task}");
592 let procedure_manager = self.procedure_manager.clone();
593 common_runtime::spawn_global(async move {
594 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
595 Ok(watcher) => watcher,
596 Err(e) => {
597 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
598 return;
599 }
600 };
601 METRIC_META_REGION_MIGRATION_DATANODES
602 .with_label_values(&["src", &task.from_peer.id.to_string()])
603 .inc();
604 METRIC_META_REGION_MIGRATION_DATANODES
605 .with_label_values(&["desc", &task.to_peer.id.to_string()])
606 .inc();
607
608 if let Err(e) = watcher::wait(watcher).await {
609 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
610 METRIC_META_REGION_MIGRATION_FAIL.inc();
611 return;
612 }
613
614 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
615 });
616
617 Ok(Some(procedure_id))
618 }
619}
620
621#[cfg(test)]
622mod test {
623 use std::assert_matches::assert_matches;
624
625 use common_meta::key::table_route::LogicalTableRouteValue;
626 use common_meta::key::test_utils::new_test_table_info;
627 use common_meta::rpc::router::Region;
628
629 use super::*;
630 use crate::procedure::region_migration::test_util::TestingEnv;
631
632 #[tokio::test]
633 async fn test_insert_running_procedure() {
634 let env = TestingEnv::new();
635 let context_factory = env.context_factory();
636 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
637 let region_id = RegionId::new(1024, 1);
638 let task = RegionMigrationProcedureTask {
639 region_id,
640 from_peer: Peer::empty(2),
641 to_peer: Peer::empty(1),
642 timeout: Duration::from_millis(1000),
643 trigger_reason: RegionMigrationTriggerReason::Manual,
644 };
645 manager
647 .tracker
648 .running_procedures
649 .write()
650 .unwrap()
651 .insert(region_id, task.clone());
652
653 let err = manager.submit_procedure(task).await.unwrap_err();
654 assert_matches!(err, error::Error::MigrationRunning { .. });
655 }
656
657 #[tokio::test]
658 async fn test_submit_procedure_invalid_task() {
659 let env = TestingEnv::new();
660 let context_factory = env.context_factory();
661 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
662 let region_id = RegionId::new(1024, 1);
663 let task = RegionMigrationProcedureTask {
664 region_id,
665 from_peer: Peer::empty(1),
666 to_peer: Peer::empty(1),
667 timeout: Duration::from_millis(1000),
668 trigger_reason: RegionMigrationTriggerReason::Manual,
669 };
670
671 let err = manager.submit_procedure(task).await.unwrap_err();
672 assert_matches!(err, error::Error::InvalidArguments { .. });
673 }
674
675 #[tokio::test]
676 async fn test_submit_procedure_table_not_found() {
677 let env = TestingEnv::new();
678 let context_factory = env.context_factory();
679 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
680 let region_id = RegionId::new(1024, 1);
681 let task = RegionMigrationProcedureTask {
682 region_id,
683 from_peer: Peer::empty(1),
684 to_peer: Peer::empty(2),
685 timeout: Duration::from_millis(1000),
686 trigger_reason: RegionMigrationTriggerReason::Manual,
687 };
688
689 let err = manager.submit_procedure(task).await.unwrap_err();
690 assert_matches!(err, error::Error::TableRouteNotFound { .. });
691 }
692
693 #[tokio::test]
694 async fn test_submit_procedure_region_route_not_found() {
695 let env = TestingEnv::new();
696 let context_factory = env.context_factory();
697 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
698 let region_id = RegionId::new(1024, 1);
699 let task = RegionMigrationProcedureTask {
700 region_id,
701 from_peer: Peer::empty(1),
702 to_peer: Peer::empty(2),
703 timeout: Duration::from_millis(1000),
704 trigger_reason: RegionMigrationTriggerReason::Manual,
705 };
706
707 let table_info = new_test_table_info(1024).into();
708 let region_routes = vec![RegionRoute {
709 region: Region::new_test(RegionId::new(1024, 2)),
710 leader_peer: Some(Peer::empty(3)),
711 ..Default::default()
712 }];
713
714 env.create_physical_table_metadata(table_info, region_routes)
715 .await;
716
717 let err = manager.submit_procedure(task).await.unwrap_err();
718 assert_matches!(err, error::Error::RegionRouteNotFound { .. });
719 }
720
721 #[tokio::test]
722 async fn test_submit_procedure_incorrect_from_peer() {
723 let env = TestingEnv::new();
724 let context_factory = env.context_factory();
725 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
726 let region_id = RegionId::new(1024, 1);
727 let task = RegionMigrationProcedureTask {
728 region_id,
729 from_peer: Peer::empty(1),
730 to_peer: Peer::empty(2),
731 timeout: Duration::from_millis(1000),
732 trigger_reason: RegionMigrationTriggerReason::Manual,
733 };
734
735 let table_info = new_test_table_info(1024).into();
736 let region_routes = vec![RegionRoute {
737 region: Region::new_test(RegionId::new(1024, 1)),
738 leader_peer: Some(Peer::empty(3)),
739 ..Default::default()
740 }];
741
742 env.create_physical_table_metadata(table_info, region_routes)
743 .await;
744
745 let err = manager.submit_procedure(task).await.unwrap_err();
746 assert_matches!(err, error::Error::LeaderPeerChanged { .. });
747 assert_eq!(
748 err.to_string(),
749 "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"
750 );
751 }
752
753 #[tokio::test]
754 async fn test_submit_procedure_region_follower_on_to_peer() {
755 let env = TestingEnv::new();
756 let context_factory = env.context_factory();
757 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
758 let region_id = RegionId::new(1024, 1);
759 let task = RegionMigrationProcedureTask {
760 region_id,
761 from_peer: Peer::empty(3),
762 to_peer: Peer::empty(2),
763 timeout: Duration::from_millis(1000),
764 trigger_reason: RegionMigrationTriggerReason::Manual,
765 };
766
767 let table_info = new_test_table_info(1024).into();
768 let region_routes = vec![RegionRoute {
769 region: Region::new_test(region_id),
770 leader_peer: Some(Peer::empty(3)),
771 follower_peers: vec![Peer::empty(2)],
772 ..Default::default()
773 }];
774
775 env.create_physical_table_metadata(table_info, region_routes)
776 .await;
777
778 let err = manager.submit_procedure(task).await.unwrap_err();
779 assert_matches!(err, error::Error::InvalidArguments { .. });
780 assert_eq!(
781 err.to_string(),
782 "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
783 );
784 }
785
786 #[tokio::test]
787 async fn test_submit_procedure_has_migrated() {
788 common_telemetry::init_default_ut_logging();
789 let env = TestingEnv::new();
790 let context_factory = env.context_factory();
791 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
792 let region_id = RegionId::new(1024, 1);
793 let task = RegionMigrationProcedureTask {
794 region_id,
795 from_peer: Peer::empty(1),
796 to_peer: Peer::empty(2),
797 timeout: Duration::from_millis(1000),
798 trigger_reason: RegionMigrationTriggerReason::Manual,
799 };
800
801 let table_info = new_test_table_info(1024).into();
802 let region_routes = vec![RegionRoute {
803 region: Region::new_test(RegionId::new(1024, 1)),
804 leader_peer: Some(Peer::empty(2)),
805 ..Default::default()
806 }];
807
808 env.create_physical_table_metadata(table_info, region_routes)
809 .await;
810
811 let err = manager.submit_procedure(task).await.unwrap_err();
812 assert_matches!(err, error::Error::RegionMigrated { .. });
813 }
814
815 #[tokio::test]
816 async fn test_verify_table_route_error() {
817 let env = TestingEnv::new();
818 let context_factory = env.context_factory();
819 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
820 let region_id = RegionId::new(1024, 1);
821 let task = RegionMigrationProcedureTask {
822 region_id,
823 from_peer: Peer::empty(1),
824 to_peer: Peer::empty(2),
825 timeout: Duration::from_millis(1000),
826 trigger_reason: RegionMigrationTriggerReason::Manual,
827 };
828
829 let err = manager
830 .verify_table_route(
831 &TableRouteValue::Logical(LogicalTableRouteValue::new(0)),
832 &task,
833 )
834 .unwrap_err();
835
836 assert_matches!(err, error::Error::Unexpected { .. });
837 }
838
839 #[tokio::test]
840 async fn test_submit_procedure_with_multiple_regions_invalid_task() {
841 let env = TestingEnv::new();
842 let context_factory = env.context_factory();
843 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
844 let task = RegionMigrationTaskBatch {
845 region_ids: vec![RegionId::new(1024, 1)],
846 from_peer: Peer::empty(1),
847 to_peer: Peer::empty(1),
848 timeout: Duration::from_millis(1000),
849 trigger_reason: RegionMigrationTriggerReason::Manual,
850 };
851
852 let err = manager
853 .submit_region_migration_task(task)
854 .await
855 .unwrap_err();
856 assert_matches!(err, error::Error::InvalidArguments { .. });
857 }
858
859 #[tokio::test]
860 async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() {
861 common_telemetry::init_default_ut_logging();
862 let env = TestingEnv::new();
863 let context_factory = env.context_factory();
864 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
865 let region_id = RegionId::new(1024, 1);
866 let task = RegionMigrationTaskBatch {
867 region_ids: vec![region_id],
868 from_peer: Peer::empty(1),
869 to_peer: Peer::empty(2),
870 timeout: Duration::from_millis(1000),
871 trigger_reason: RegionMigrationTriggerReason::Manual,
872 };
873 let table_info = new_test_table_info(1024).into();
874 let region_routes = vec![RegionRoute {
875 region: Region::new_test(region_id),
876 leader_peer: Some(Peer::empty(2)),
877 ..Default::default()
878 }];
879 env.create_physical_table_metadata(table_info, region_routes)
880 .await;
881 let result = manager.submit_region_migration_task(task).await.unwrap();
882
883 assert_eq!(
884 result,
885 SubmitRegionMigrationTaskResult {
886 migrated: vec![region_id],
887 ..Default::default()
888 }
889 );
890 }
891
892 #[tokio::test]
893 async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() {
894 let env = TestingEnv::new();
895 let context_factory = env.context_factory();
896 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
897 let region_id = RegionId::new(1024, 1);
898 let task = RegionMigrationTaskBatch {
899 region_ids: vec![region_id],
900 from_peer: Peer::empty(1),
901 to_peer: Peer::empty(2),
902 timeout: Duration::from_millis(1000),
903 trigger_reason: RegionMigrationTriggerReason::Manual,
904 };
905
906 let table_info = new_test_table_info(1024).into();
907 let region_routes = vec![RegionRoute {
908 region: Region::new_test(RegionId::new(1024, 1)),
909 leader_peer: Some(Peer::empty(3)),
910 ..Default::default()
911 }];
912
913 env.create_physical_table_metadata(table_info, region_routes)
914 .await;
915 let result = manager.submit_region_migration_task(task).await.unwrap();
916 assert_eq!(
917 result,
918 SubmitRegionMigrationTaskResult {
919 leader_changed: vec![region_id],
920 ..Default::default()
921 }
922 );
923 }
924
925 #[tokio::test]
926 async fn test_submit_procedure_with_multiple_regions_peer_conflict() {
927 let env = TestingEnv::new();
928 let context_factory = env.context_factory();
929 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
930 let region_id = RegionId::new(1024, 1);
931 let task = RegionMigrationTaskBatch {
932 region_ids: vec![region_id],
933 from_peer: Peer::empty(3),
934 to_peer: Peer::empty(2),
935 timeout: Duration::from_millis(1000),
936 trigger_reason: RegionMigrationTriggerReason::Manual,
937 };
938
939 let table_info = new_test_table_info(1024).into();
940 let region_routes = vec![RegionRoute {
941 region: Region::new_test(region_id),
942 leader_peer: Some(Peer::empty(3)),
943 follower_peers: vec![Peer::empty(2)],
944 ..Default::default()
945 }];
946
947 env.create_physical_table_metadata(table_info, region_routes)
948 .await;
949 let result = manager.submit_region_migration_task(task).await.unwrap();
950 assert_eq!(
951 result,
952 SubmitRegionMigrationTaskResult {
953 peer_conflict: vec![region_id],
954 ..Default::default()
955 }
956 );
957 }
958
959 #[tokio::test]
960 async fn test_running_regions() {
961 let env = TestingEnv::new();
962 let context_factory = env.context_factory();
963 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
964 let region_id = RegionId::new(1024, 1);
965 let task = RegionMigrationTaskBatch {
966 region_ids: vec![region_id, RegionId::new(1024, 2)],
967 from_peer: Peer::empty(1),
968 to_peer: Peer::empty(2),
969 timeout: Duration::from_millis(1000),
970 trigger_reason: RegionMigrationTriggerReason::Manual,
971 };
972 manager.tracker.running_procedures.write().unwrap().insert(
974 region_id,
975 RegionMigrationProcedureTask::new(
976 region_id,
977 task.from_peer.clone(),
978 task.to_peer.clone(),
979 task.timeout,
980 task.trigger_reason,
981 ),
982 );
983 let table_info = new_test_table_info(1024).into();
984 let region_routes = vec![RegionRoute {
985 region: Region::new_test(RegionId::new(1024, 2)),
986 leader_peer: Some(Peer::empty(1)),
987 ..Default::default()
988 }];
989 env.create_physical_table_metadata(table_info, region_routes)
990 .await;
991 let result = manager.submit_region_migration_task(task).await.unwrap();
992 assert_eq!(result.migrating, vec![region_id]);
993 assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]);
994 assert!(result.procedure_id.is_some());
995 }
996}