1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::utils::{
35 RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
36};
37use crate::procedure::region_migration::{
38 DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
39};
40
41pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
42
43pub struct RegionMigrationManager {
45 procedure_manager: ProcedureManagerRef,
46 context_factory: DefaultContextFactory,
47 tracker: RegionMigrationProcedureTracker,
48}
49
50#[derive(Default, Clone)]
51pub struct RegionMigrationProcedureTracker {
52 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
53}
54
55impl RegionMigrationProcedureTracker {
56 pub(crate) fn insert_running_procedure(
58 &self,
59 task: &RegionMigrationProcedureTask,
60 ) -> Option<RegionMigrationProcedureGuard> {
61 let mut procedures = self.running_procedures.write().unwrap();
62 match procedures.entry(task.region_id) {
63 Entry::Occupied(_) => None,
64 Entry::Vacant(v) => {
65 v.insert(task.clone());
66 Some(RegionMigrationProcedureGuard {
67 region_id: task.region_id,
68 running_procedures: self.running_procedures.clone(),
69 })
70 }
71 }
72 }
73
74 pub(crate) fn contains(&self, region_id: RegionId) -> bool {
76 self.running_procedures
77 .read()
78 .unwrap()
79 .contains_key(®ion_id)
80 }
81}
82
83pub(crate) struct RegionMigrationProcedureGuard {
85 region_id: RegionId,
86 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
87}
88
89impl Drop for RegionMigrationProcedureGuard {
90 fn drop(&mut self) {
91 let exists = self
92 .running_procedures
93 .read()
94 .unwrap()
95 .contains_key(&self.region_id);
96 if exists {
97 self.running_procedures
98 .write()
99 .unwrap()
100 .remove(&self.region_id);
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct RegionMigrationProcedureTask {
108 pub(crate) region_id: RegionId,
109 pub(crate) from_peer: Peer,
110 pub(crate) to_peer: Peer,
111 pub(crate) timeout: Duration,
112 pub(crate) trigger_reason: RegionMigrationTriggerReason,
113}
114
115#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
117#[strum(serialize_all = "PascalCase")]
118pub enum RegionMigrationTriggerReason {
119 #[default]
120 Unknown,
122 Manual,
124 AutoRebalance,
126 Failover,
128}
129
130impl RegionMigrationProcedureTask {
131 pub fn new(
132 region_id: RegionId,
133 from_peer: Peer,
134 to_peer: Peer,
135 timeout: Duration,
136 trigger_reason: RegionMigrationTriggerReason,
137 ) -> Self {
138 Self {
139 region_id,
140 from_peer,
141 to_peer,
142 timeout,
143 trigger_reason,
144 }
145 }
146}
147
148impl Display for RegionMigrationProcedureTask {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 write!(
151 f,
152 "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
153 self.region_id, self.from_peer, self.to_peer, self.trigger_reason
154 )
155 }
156}
157
158#[derive(Debug, Default, PartialEq, Eq)]
160pub struct SubmitRegionMigrationTaskResult {
161 pub migrated: Vec<RegionId>,
163 pub leader_changed: Vec<RegionId>,
165 pub peer_conflict: Vec<RegionId>,
167 pub table_not_found: Vec<RegionId>,
169 pub migrating: Vec<RegionId>,
171 pub submitted: Vec<RegionId>,
173 pub procedure_id: Option<ProcedureId>,
175}
176
177impl RegionMigrationManager {
178 pub(crate) fn new(
180 procedure_manager: ProcedureManagerRef,
181 context_factory: DefaultContextFactory,
182 ) -> Self {
183 Self {
184 procedure_manager,
185 context_factory,
186 tracker: RegionMigrationProcedureTracker::default(),
187 }
188 }
189
190 pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
192 &self.tracker
193 }
194
195 pub(crate) fn try_start(&self) -> Result<()> {
197 let context_factory = self.context_factory.clone();
198 let tracker = self.tracker.clone();
199 self.procedure_manager
200 .register_loader(
201 RegionMigrationProcedure::TYPE_NAME,
202 Box::new(move |json| {
203 let context_factory = context_factory.clone();
204 let tracker = tracker.clone();
205 RegionMigrationProcedure::from_json(json, context_factory, tracker)
206 .map(|p| Box::new(p) as _)
207 }),
208 )
209 .context(error::RegisterProcedureLoaderSnafu {
210 type_name: RegionMigrationProcedure::TYPE_NAME,
211 })
212 }
213
214 fn insert_running_procedure(
215 &self,
216 task: &RegionMigrationProcedureTask,
217 ) -> Option<RegionMigrationProcedureGuard> {
218 self.tracker.insert_running_procedure(task)
219 }
220
221 fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
222 if task.to_peer.id == task.from_peer.id {
223 return error::InvalidArgumentsSnafu {
224 err_msg: "The `from_peer_id` can't equal `to_peer_id`",
225 }
226 .fail();
227 }
228
229 Ok(())
230 }
231
232 async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
233 let table_route = self
234 .context_factory
235 .table_metadata_manager
236 .table_route_manager()
237 .table_route_storage()
238 .get(region_id.table_id())
239 .await
240 .context(error::TableMetadataManagerSnafu)?
241 .context(error::TableRouteNotFoundSnafu {
242 table_id: region_id.table_id(),
243 })?;
244
245 Ok(table_route)
246 }
247
248 async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
249 let table_route = self
250 .context_factory
251 .table_metadata_manager
252 .table_info_manager()
253 .get(region_id.table_id())
254 .await
255 .context(error::TableMetadataManagerSnafu)?
256 .context(error::TableInfoNotFoundSnafu {
257 table_id: region_id.table_id(),
258 })?
259 .into_inner();
260
261 Ok(table_route)
262 }
263
264 fn verify_table_route(
266 &self,
267 table_route: &TableRouteValue,
268 task: &RegionMigrationProcedureTask,
269 ) -> Result<()> {
270 if !table_route.is_physical() {
271 return error::UnexpectedSnafu {
272 violated: format!(
273 "Trying to execute region migration on the logical table, task {task}"
274 ),
275 }
276 .fail();
277 }
278
279 Ok(())
280 }
281
282 fn has_migrated(
284 &self,
285 region_route: &RegionRoute,
286 task: &RegionMigrationProcedureTask,
287 ) -> Result<bool> {
288 if region_route.is_leader_downgrading() {
289 return Ok(false);
290 }
291
292 let leader_peer = region_route
293 .leader_peer
294 .as_ref()
295 .context(error::UnexpectedSnafu {
296 violated: "Region route leader peer is not found",
297 })?;
298
299 Ok(leader_peer.id == task.to_peer.id)
300 }
301
302 fn verify_region_leader_peer(
306 &self,
307 region_route: &RegionRoute,
308 task: &mut RegionMigrationProcedureTask,
309 ) -> Result<()> {
310 let leader_peer = region_route
311 .leader_peer
312 .as_ref()
313 .context(error::UnexpectedSnafu {
314 violated: "Region route leader peer is not found",
315 })?;
316
317 ensure!(
318 leader_peer.id == task.from_peer.id,
319 error::LeaderPeerChangedSnafu {
320 msg: format!(
321 "Region's leader peer({}) is not the `from_peer`({}), region: {}",
322 leader_peer.id, task.from_peer.id, task.region_id
323 ),
324 }
325 );
326
327 if task.from_peer.addr.is_empty() {
328 warn!(
329 "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
330 leader_peer, task.region_id
331 );
332 task.from_peer = leader_peer.clone();
334 }
335
336 Ok(())
337 }
338
339 fn verify_region_follower_peers(
341 &self,
342 region_route: &RegionRoute,
343 task: &RegionMigrationProcedureTask,
344 ) -> Result<()> {
345 ensure!(
346 !region_route.follower_peers.contains(&task.to_peer),
347 error::InvalidArgumentsSnafu {
348 err_msg: format!(
349 "The `to_peer`({}) is already has a region follower, region: {}",
350 task.to_peer.id, task.region_id
351 ),
352 },
353 );
354
355 Ok(())
356 }
357
358 fn extract_running_regions(
363 &self,
364 task: &mut RegionMigrationTaskBatch,
365 ) -> (Vec<RegionId>, Vec<RegionMigrationProcedureGuard>) {
366 let mut migrating_region_ids = Vec::new();
367 let mut procedure_guards = Vec::with_capacity(task.region_ids.len());
368
369 for region_id in &task.region_ids {
370 let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new(
371 *region_id,
372 task.from_peer.clone(),
373 task.to_peer.clone(),
374 task.timeout,
375 task.trigger_reason,
376 )) else {
377 migrating_region_ids.push(*region_id);
378 continue;
379 };
380 procedure_guards.push(guard);
381 }
382
383 let migrating_set = migrating_region_ids.iter().cloned().collect::<HashSet<_>>();
384 task.region_ids.retain(|id| !migrating_set.contains(id));
385
386 (migrating_region_ids, procedure_guards)
387 }
388
389 pub async fn submit_region_migration_task(
390 &self,
391 mut task: RegionMigrationTaskBatch,
392 ) -> Result<SubmitRegionMigrationTaskResult> {
393 let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task);
394 let RegionMigrationAnalysis {
395 migrated,
396 leader_changed,
397 peer_conflict,
398 mut table_not_found,
399 pending,
400 } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager)
401 .await?;
402 if pending.is_empty() {
403 return Ok(SubmitRegionMigrationTaskResult {
404 migrated,
405 leader_changed,
406 peer_conflict,
407 table_not_found,
408 migrating: migrating_region_ids,
409 submitted: vec![],
410 procedure_id: None,
411 });
412 }
413
414 task.region_ids = pending;
416 let table_regions = task.table_regions();
417 let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
418 let table_info_values = self
419 .context_factory
420 .table_metadata_manager
421 .table_info_manager()
422 .batch_get(&table_ids)
423 .await
424 .context(error::TableMetadataManagerSnafu)?;
425 let mut catalog_and_schema = Vec::with_capacity(table_info_values.len());
426 for (table_id, regions) in table_regions {
427 match table_info_values.get(&table_id) {
428 Some(table_info) => {
429 let TableName {
430 catalog_name,
431 schema_name,
432 ..
433 } = table_info.table_name();
434 catalog_and_schema.push((catalog_name, schema_name));
435 }
436 None => {
437 task.region_ids.retain(|id| id.table_id() != table_id);
438 table_not_found.extend(regions);
439 }
440 }
441 }
442 if task.region_ids.is_empty() {
443 return Ok(SubmitRegionMigrationTaskResult {
444 migrated,
445 leader_changed,
446 peer_conflict,
447 table_not_found,
448 migrating: migrating_region_ids,
449 submitted: vec![],
450 procedure_id: None,
451 });
452 }
453
454 let submitting_region_ids = task.region_ids.clone();
455 let procedure_id = self
456 .submit_procedure_inner(task, procedure_guards, catalog_and_schema)
457 .await?;
458 Ok(SubmitRegionMigrationTaskResult {
459 migrated,
460 leader_changed,
461 peer_conflict,
462 table_not_found,
463 migrating: migrating_region_ids,
464 submitted: submitting_region_ids,
465 procedure_id: Some(procedure_id),
466 })
467 }
468
469 async fn submit_procedure_inner(
470 &self,
471 task: RegionMigrationTaskBatch,
472 procedure_guards: Vec<RegionMigrationProcedureGuard>,
473 catalog_and_schema: Vec<(String, String)>,
474 ) -> Result<ProcedureId> {
475 let procedure = RegionMigrationProcedure::new(
476 PersistentContext::new(
477 catalog_and_schema,
478 task.from_peer.clone(),
479 task.to_peer.clone(),
480 task.region_ids.clone(),
481 task.timeout,
482 task.trigger_reason,
483 ),
484 self.context_factory.clone(),
485 procedure_guards,
486 );
487 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488 let procedure_id = procedure_with_id.id;
489 info!("Starting region migration procedure {procedure_id} for {task}");
490 let procedure_manager = self.procedure_manager.clone();
491 let num_region = task.region_ids.len();
492
493 common_runtime::spawn_global(async move {
494 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
495 Ok(watcher) => watcher,
496 Err(e) => {
497 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
498 return;
499 }
500 };
501 METRIC_META_REGION_MIGRATION_DATANODES
502 .with_label_values(&["src", &task.from_peer.id.to_string()])
503 .inc_by(num_region as u64);
504 METRIC_META_REGION_MIGRATION_DATANODES
505 .with_label_values(&["desc", &task.to_peer.id.to_string()])
506 .inc_by(num_region as u64);
507
508 if let Err(e) = watcher::wait(watcher).await {
509 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
510 METRIC_META_REGION_MIGRATION_FAIL.inc();
511 return;
512 }
513
514 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
515 });
516
517 Ok(procedure_id)
518 }
519
520 pub async fn submit_procedure(
522 &self,
523 mut task: RegionMigrationProcedureTask,
524 ) -> Result<Option<ProcedureId>> {
525 let Some(guard) = self.insert_running_procedure(&task) else {
526 return error::MigrationRunningSnafu {
527 region_id: task.region_id,
528 }
529 .fail();
530 };
531
532 self.verify_task(&task)?;
533
534 let region_id = task.region_id;
535
536 let table_route = self.retrieve_table_route(region_id).await?;
537 self.verify_table_route(&table_route, &task)?;
538
539 let region_route = table_route
541 .region_route(region_id)
542 .context(error::UnexpectedLogicalRouteTableSnafu {
543 err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
544 })?
545 .context(error::RegionRouteNotFoundSnafu { region_id })?;
546
547 if self.has_migrated(®ion_route, &task)? {
548 info!("Skipping region migration task: {task}");
549 return error::RegionMigratedSnafu {
550 region_id,
551 target_peer_id: task.to_peer.id,
552 }
553 .fail();
554 }
555
556 self.verify_region_leader_peer(®ion_route, &mut task)?;
557 self.verify_region_follower_peers(®ion_route, &task)?;
558 let table_info = self.retrieve_table_info(region_id).await?;
559 let TableName {
560 catalog_name,
561 schema_name,
562 ..
563 } = table_info.table_name();
564 let RegionMigrationProcedureTask {
565 region_id,
566 from_peer,
567 to_peer,
568 timeout,
569 trigger_reason,
570 } = task.clone();
571 let procedure = RegionMigrationProcedure::new(
572 PersistentContext::new(
573 vec![(catalog_name, schema_name)],
574 from_peer,
575 to_peer,
576 vec![region_id],
577 timeout,
578 trigger_reason,
579 ),
580 self.context_factory.clone(),
581 vec![guard],
582 );
583 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
584 let procedure_id = procedure_with_id.id;
585 info!("Starting region migration procedure {procedure_id} for {task}");
586 let procedure_manager = self.procedure_manager.clone();
587 common_runtime::spawn_global(async move {
588 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
589 Ok(watcher) => watcher,
590 Err(e) => {
591 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
592 return;
593 }
594 };
595 METRIC_META_REGION_MIGRATION_DATANODES
596 .with_label_values(&["src", &task.from_peer.id.to_string()])
597 .inc();
598 METRIC_META_REGION_MIGRATION_DATANODES
599 .with_label_values(&["desc", &task.to_peer.id.to_string()])
600 .inc();
601
602 if let Err(e) = watcher::wait(watcher).await {
603 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
604 METRIC_META_REGION_MIGRATION_FAIL.inc();
605 return;
606 }
607
608 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
609 });
610
611 Ok(Some(procedure_id))
612 }
613}
614
615#[cfg(test)]
616mod test {
617 use std::assert_matches::assert_matches;
618
619 use common_meta::key::table_route::LogicalTableRouteValue;
620 use common_meta::key::test_utils::new_test_table_info;
621 use common_meta::rpc::router::Region;
622
623 use super::*;
624 use crate::procedure::region_migration::test_util::TestingEnv;
625
626 #[tokio::test]
627 async fn test_insert_running_procedure() {
628 let env = TestingEnv::new();
629 let context_factory = env.context_factory();
630 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
631 let region_id = RegionId::new(1024, 1);
632 let task = RegionMigrationProcedureTask {
633 region_id,
634 from_peer: Peer::empty(2),
635 to_peer: Peer::empty(1),
636 timeout: Duration::from_millis(1000),
637 trigger_reason: RegionMigrationTriggerReason::Manual,
638 };
639 manager
641 .tracker
642 .running_procedures
643 .write()
644 .unwrap()
645 .insert(region_id, task.clone());
646
647 let err = manager.submit_procedure(task).await.unwrap_err();
648 assert_matches!(err, error::Error::MigrationRunning { .. });
649 }
650
651 #[tokio::test]
652 async fn test_submit_procedure_invalid_task() {
653 let env = TestingEnv::new();
654 let context_factory = env.context_factory();
655 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
656 let region_id = RegionId::new(1024, 1);
657 let task = RegionMigrationProcedureTask {
658 region_id,
659 from_peer: Peer::empty(1),
660 to_peer: Peer::empty(1),
661 timeout: Duration::from_millis(1000),
662 trigger_reason: RegionMigrationTriggerReason::Manual,
663 };
664
665 let err = manager.submit_procedure(task).await.unwrap_err();
666 assert_matches!(err, error::Error::InvalidArguments { .. });
667 }
668
669 #[tokio::test]
670 async fn test_submit_procedure_table_not_found() {
671 let env = TestingEnv::new();
672 let context_factory = env.context_factory();
673 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
674 let region_id = RegionId::new(1024, 1);
675 let task = RegionMigrationProcedureTask {
676 region_id,
677 from_peer: Peer::empty(1),
678 to_peer: Peer::empty(2),
679 timeout: Duration::from_millis(1000),
680 trigger_reason: RegionMigrationTriggerReason::Manual,
681 };
682
683 let err = manager.submit_procedure(task).await.unwrap_err();
684 assert_matches!(err, error::Error::TableRouteNotFound { .. });
685 }
686
687 #[tokio::test]
688 async fn test_submit_procedure_region_route_not_found() {
689 let env = TestingEnv::new();
690 let context_factory = env.context_factory();
691 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
692 let region_id = RegionId::new(1024, 1);
693 let task = RegionMigrationProcedureTask {
694 region_id,
695 from_peer: Peer::empty(1),
696 to_peer: Peer::empty(2),
697 timeout: Duration::from_millis(1000),
698 trigger_reason: RegionMigrationTriggerReason::Manual,
699 };
700
701 let table_info = new_test_table_info(1024, vec![1]).into();
702 let region_routes = vec![RegionRoute {
703 region: Region::new_test(RegionId::new(1024, 2)),
704 leader_peer: Some(Peer::empty(3)),
705 ..Default::default()
706 }];
707
708 env.create_physical_table_metadata(table_info, region_routes)
709 .await;
710
711 let err = manager.submit_procedure(task).await.unwrap_err();
712 assert_matches!(err, error::Error::RegionRouteNotFound { .. });
713 }
714
715 #[tokio::test]
716 async fn test_submit_procedure_incorrect_from_peer() {
717 let env = TestingEnv::new();
718 let context_factory = env.context_factory();
719 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
720 let region_id = RegionId::new(1024, 1);
721 let task = RegionMigrationProcedureTask {
722 region_id,
723 from_peer: Peer::empty(1),
724 to_peer: Peer::empty(2),
725 timeout: Duration::from_millis(1000),
726 trigger_reason: RegionMigrationTriggerReason::Manual,
727 };
728
729 let table_info = new_test_table_info(1024, vec![1]).into();
730 let region_routes = vec![RegionRoute {
731 region: Region::new_test(RegionId::new(1024, 1)),
732 leader_peer: Some(Peer::empty(3)),
733 ..Default::default()
734 }];
735
736 env.create_physical_table_metadata(table_info, region_routes)
737 .await;
738
739 let err = manager.submit_procedure(task).await.unwrap_err();
740 assert_matches!(err, error::Error::LeaderPeerChanged { .. });
741 assert_eq!(
742 err.to_string(),
743 "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"
744 );
745 }
746
747 #[tokio::test]
748 async fn test_submit_procedure_region_follower_on_to_peer() {
749 let env = TestingEnv::new();
750 let context_factory = env.context_factory();
751 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
752 let region_id = RegionId::new(1024, 1);
753 let task = RegionMigrationProcedureTask {
754 region_id,
755 from_peer: Peer::empty(3),
756 to_peer: Peer::empty(2),
757 timeout: Duration::from_millis(1000),
758 trigger_reason: RegionMigrationTriggerReason::Manual,
759 };
760
761 let table_info = new_test_table_info(1024, vec![1]).into();
762 let region_routes = vec![RegionRoute {
763 region: Region::new_test(region_id),
764 leader_peer: Some(Peer::empty(3)),
765 follower_peers: vec![Peer::empty(2)],
766 ..Default::default()
767 }];
768
769 env.create_physical_table_metadata(table_info, region_routes)
770 .await;
771
772 let err = manager.submit_procedure(task).await.unwrap_err();
773 assert_matches!(err, error::Error::InvalidArguments { .. });
774 assert_eq!(
775 err.to_string(),
776 "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
777 );
778 }
779
780 #[tokio::test]
781 async fn test_submit_procedure_has_migrated() {
782 common_telemetry::init_default_ut_logging();
783 let env = TestingEnv::new();
784 let context_factory = env.context_factory();
785 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
786 let region_id = RegionId::new(1024, 1);
787 let task = RegionMigrationProcedureTask {
788 region_id,
789 from_peer: Peer::empty(1),
790 to_peer: Peer::empty(2),
791 timeout: Duration::from_millis(1000),
792 trigger_reason: RegionMigrationTriggerReason::Manual,
793 };
794
795 let table_info = new_test_table_info(1024, vec![1]).into();
796 let region_routes = vec![RegionRoute {
797 region: Region::new_test(RegionId::new(1024, 1)),
798 leader_peer: Some(Peer::empty(2)),
799 ..Default::default()
800 }];
801
802 env.create_physical_table_metadata(table_info, region_routes)
803 .await;
804
805 let err = manager.submit_procedure(task).await.unwrap_err();
806 assert_matches!(err, error::Error::RegionMigrated { .. });
807 }
808
809 #[tokio::test]
810 async fn test_verify_table_route_error() {
811 let env = TestingEnv::new();
812 let context_factory = env.context_factory();
813 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
814 let region_id = RegionId::new(1024, 1);
815 let task = RegionMigrationProcedureTask {
816 region_id,
817 from_peer: Peer::empty(1),
818 to_peer: Peer::empty(2),
819 timeout: Duration::from_millis(1000),
820 trigger_reason: RegionMigrationTriggerReason::Manual,
821 };
822
823 let err = manager
824 .verify_table_route(
825 &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
826 &task,
827 )
828 .unwrap_err();
829
830 assert_matches!(err, error::Error::Unexpected { .. });
831 }
832
833 #[tokio::test]
834 async fn test_submit_procedure_with_multiple_regions_invalid_task() {
835 let env = TestingEnv::new();
836 let context_factory = env.context_factory();
837 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
838 let task = RegionMigrationTaskBatch {
839 region_ids: vec![RegionId::new(1024, 1)],
840 from_peer: Peer::empty(1),
841 to_peer: Peer::empty(1),
842 timeout: Duration::from_millis(1000),
843 trigger_reason: RegionMigrationTriggerReason::Manual,
844 };
845
846 let err = manager
847 .submit_region_migration_task(task)
848 .await
849 .unwrap_err();
850 assert_matches!(err, error::Error::InvalidArguments { .. });
851 }
852
853 #[tokio::test]
854 async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() {
855 common_telemetry::init_default_ut_logging();
856 let env = TestingEnv::new();
857 let context_factory = env.context_factory();
858 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
859 let region_id = RegionId::new(1024, 1);
860 let task = RegionMigrationTaskBatch {
861 region_ids: vec![region_id],
862 from_peer: Peer::empty(1),
863 to_peer: Peer::empty(2),
864 timeout: Duration::from_millis(1000),
865 trigger_reason: RegionMigrationTriggerReason::Manual,
866 };
867 let table_info = new_test_table_info(1024, vec![1]).into();
868 let region_routes = vec![RegionRoute {
869 region: Region::new_test(region_id),
870 leader_peer: Some(Peer::empty(2)),
871 ..Default::default()
872 }];
873 env.create_physical_table_metadata(table_info, region_routes)
874 .await;
875 let result = manager.submit_region_migration_task(task).await.unwrap();
876
877 assert_eq!(
878 result,
879 SubmitRegionMigrationTaskResult {
880 migrated: vec![region_id],
881 ..Default::default()
882 }
883 );
884 }
885
886 #[tokio::test]
887 async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() {
888 let env = TestingEnv::new();
889 let context_factory = env.context_factory();
890 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
891 let region_id = RegionId::new(1024, 1);
892 let task = RegionMigrationTaskBatch {
893 region_ids: vec![region_id],
894 from_peer: Peer::empty(1),
895 to_peer: Peer::empty(2),
896 timeout: Duration::from_millis(1000),
897 trigger_reason: RegionMigrationTriggerReason::Manual,
898 };
899
900 let table_info = new_test_table_info(1024, vec![1]).into();
901 let region_routes = vec![RegionRoute {
902 region: Region::new_test(RegionId::new(1024, 1)),
903 leader_peer: Some(Peer::empty(3)),
904 ..Default::default()
905 }];
906
907 env.create_physical_table_metadata(table_info, region_routes)
908 .await;
909 let result = manager.submit_region_migration_task(task).await.unwrap();
910 assert_eq!(
911 result,
912 SubmitRegionMigrationTaskResult {
913 leader_changed: vec![region_id],
914 ..Default::default()
915 }
916 );
917 }
918
919 #[tokio::test]
920 async fn test_submit_procedure_with_multiple_regions_peer_conflict() {
921 let env = TestingEnv::new();
922 let context_factory = env.context_factory();
923 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
924 let region_id = RegionId::new(1024, 1);
925 let task = RegionMigrationTaskBatch {
926 region_ids: vec![region_id],
927 from_peer: Peer::empty(3),
928 to_peer: Peer::empty(2),
929 timeout: Duration::from_millis(1000),
930 trigger_reason: RegionMigrationTriggerReason::Manual,
931 };
932
933 let table_info = new_test_table_info(1024, vec![1]).into();
934 let region_routes = vec![RegionRoute {
935 region: Region::new_test(region_id),
936 leader_peer: Some(Peer::empty(3)),
937 follower_peers: vec![Peer::empty(2)],
938 ..Default::default()
939 }];
940
941 env.create_physical_table_metadata(table_info, region_routes)
942 .await;
943 let result = manager.submit_region_migration_task(task).await.unwrap();
944 assert_eq!(
945 result,
946 SubmitRegionMigrationTaskResult {
947 peer_conflict: vec![region_id],
948 ..Default::default()
949 }
950 );
951 }
952
953 #[tokio::test]
954 async fn test_running_regions() {
955 let env = TestingEnv::new();
956 let context_factory = env.context_factory();
957 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
958 let region_id = RegionId::new(1024, 1);
959 let task = RegionMigrationTaskBatch {
960 region_ids: vec![region_id, RegionId::new(1024, 2)],
961 from_peer: Peer::empty(1),
962 to_peer: Peer::empty(2),
963 timeout: Duration::from_millis(1000),
964 trigger_reason: RegionMigrationTriggerReason::Manual,
965 };
966 manager.tracker.running_procedures.write().unwrap().insert(
968 region_id,
969 RegionMigrationProcedureTask::new(
970 region_id,
971 task.from_peer.clone(),
972 task.to_peer.clone(),
973 task.timeout,
974 task.trigger_reason,
975 ),
976 );
977 let table_info = new_test_table_info(1024, vec![1]).into();
978 let region_routes = vec![RegionRoute {
979 region: Region::new_test(RegionId::new(1024, 2)),
980 leader_peer: Some(Peer::empty(1)),
981 ..Default::default()
982 }];
983 env.create_physical_table_metadata(table_info, region_routes)
984 .await;
985 let result = manager.submit_region_migration_task(task).await.unwrap();
986 assert_eq!(result.migrating, vec![region_id]);
987 assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]);
988 assert!(result.procedure_id.is_some());
989 }
990}