1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, OptionExt, ResultExt};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::{
35 DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
36};
37
38pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
39
40pub struct RegionMigrationManager {
42 procedure_manager: ProcedureManagerRef,
43 context_factory: DefaultContextFactory,
44 tracker: RegionMigrationProcedureTracker,
45}
46
47#[derive(Default, Clone)]
48pub struct RegionMigrationProcedureTracker {
49 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
50}
51
52impl RegionMigrationProcedureTracker {
53 pub(crate) fn insert_running_procedure(
55 &self,
56 task: &RegionMigrationProcedureTask,
57 ) -> Option<RegionMigrationProcedureGuard> {
58 let mut procedures = self.running_procedures.write().unwrap();
59 match procedures.entry(task.region_id) {
60 Entry::Occupied(_) => None,
61 Entry::Vacant(v) => {
62 v.insert(task.clone());
63 Some(RegionMigrationProcedureGuard {
64 region_id: task.region_id,
65 running_procedures: self.running_procedures.clone(),
66 })
67 }
68 }
69 }
70
71 pub(crate) fn contains(&self, region_id: RegionId) -> bool {
73 self.running_procedures
74 .read()
75 .unwrap()
76 .contains_key(®ion_id)
77 }
78}
79
80pub(crate) struct RegionMigrationProcedureGuard {
82 region_id: RegionId,
83 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
84}
85
86impl Drop for RegionMigrationProcedureGuard {
87 fn drop(&mut self) {
88 let exists = self
89 .running_procedures
90 .read()
91 .unwrap()
92 .contains_key(&self.region_id);
93 if exists {
94 self.running_procedures
95 .write()
96 .unwrap()
97 .remove(&self.region_id);
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
103pub struct RegionMigrationProcedureTask {
104 pub(crate) region_id: RegionId,
105 pub(crate) from_peer: Peer,
106 pub(crate) to_peer: Peer,
107 pub(crate) timeout: Duration,
108 pub(crate) trigger_reason: RegionMigrationTriggerReason,
109}
110
111#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
113#[strum(serialize_all = "PascalCase")]
114pub enum RegionMigrationTriggerReason {
115 #[default]
116 Unknown,
118 Manual,
120 AutoRebalance,
122 Failover,
124}
125
126impl RegionMigrationProcedureTask {
127 pub fn new(
128 region_id: RegionId,
129 from_peer: Peer,
130 to_peer: Peer,
131 timeout: Duration,
132 trigger_reason: RegionMigrationTriggerReason,
133 ) -> Self {
134 Self {
135 region_id,
136 from_peer,
137 to_peer,
138 timeout,
139 trigger_reason,
140 }
141 }
142}
143
144impl Display for RegionMigrationProcedureTask {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 write!(
147 f,
148 "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
149 self.region_id, self.from_peer, self.to_peer, self.trigger_reason
150 )
151 }
152}
153
154impl RegionMigrationManager {
155 pub(crate) fn new(
157 procedure_manager: ProcedureManagerRef,
158 context_factory: DefaultContextFactory,
159 ) -> Self {
160 Self {
161 procedure_manager,
162 context_factory,
163 tracker: RegionMigrationProcedureTracker::default(),
164 }
165 }
166
167 pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
169 &self.tracker
170 }
171
172 pub(crate) fn try_start(&self) -> Result<()> {
174 let context_factory = self.context_factory.clone();
175 let tracker = self.tracker.clone();
176 self.procedure_manager
177 .register_loader(
178 RegionMigrationProcedure::TYPE_NAME,
179 Box::new(move |json| {
180 let context_factory = context_factory.clone();
181 let tracker = tracker.clone();
182 RegionMigrationProcedure::from_json(json, context_factory, tracker)
183 .map(|p| Box::new(p) as _)
184 }),
185 )
186 .context(error::RegisterProcedureLoaderSnafu {
187 type_name: RegionMigrationProcedure::TYPE_NAME,
188 })
189 }
190
191 fn insert_running_procedure(
192 &self,
193 task: &RegionMigrationProcedureTask,
194 ) -> Option<RegionMigrationProcedureGuard> {
195 self.tracker.insert_running_procedure(task)
196 }
197
198 fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
199 if task.to_peer.id == task.from_peer.id {
200 return error::InvalidArgumentsSnafu {
201 err_msg: "The `from_peer_id` can't equal `to_peer_id`",
202 }
203 .fail();
204 }
205
206 Ok(())
207 }
208
209 async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
210 let table_route = self
211 .context_factory
212 .table_metadata_manager
213 .table_route_manager()
214 .table_route_storage()
215 .get(region_id.table_id())
216 .await
217 .context(error::TableMetadataManagerSnafu)?
218 .context(error::TableRouteNotFoundSnafu {
219 table_id: region_id.table_id(),
220 })?;
221
222 Ok(table_route)
223 }
224
225 async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
226 let table_route = self
227 .context_factory
228 .table_metadata_manager
229 .table_info_manager()
230 .get(region_id.table_id())
231 .await
232 .context(error::TableMetadataManagerSnafu)?
233 .context(error::TableInfoNotFoundSnafu {
234 table_id: region_id.table_id(),
235 })?
236 .into_inner();
237
238 Ok(table_route)
239 }
240
241 fn verify_table_route(
243 &self,
244 table_route: &TableRouteValue,
245 task: &RegionMigrationProcedureTask,
246 ) -> Result<()> {
247 if !table_route.is_physical() {
248 return error::UnexpectedSnafu {
249 violated: format!(
250 "Trying to execute region migration on the logical table, task {task}"
251 ),
252 }
253 .fail();
254 }
255
256 Ok(())
257 }
258
259 fn has_migrated(
261 &self,
262 region_route: &RegionRoute,
263 task: &RegionMigrationProcedureTask,
264 ) -> Result<bool> {
265 if region_route.is_leader_downgrading() {
266 return Ok(false);
267 }
268
269 let leader_peer = region_route
270 .leader_peer
271 .as_ref()
272 .context(error::UnexpectedSnafu {
273 violated: "Region route leader peer is not found",
274 })?;
275
276 Ok(leader_peer.id == task.to_peer.id)
277 }
278
279 fn verify_region_leader_peer(
283 &self,
284 region_route: &RegionRoute,
285 task: &mut RegionMigrationProcedureTask,
286 ) -> Result<()> {
287 let leader_peer = region_route
288 .leader_peer
289 .as_ref()
290 .context(error::UnexpectedSnafu {
291 violated: "Region route leader peer is not found",
292 })?;
293
294 ensure!(
295 leader_peer.id == task.from_peer.id,
296 error::LeaderPeerChangedSnafu {
297 msg: format!(
298 "Region's leader peer({}) is not the `from_peer`({}), region: {}",
299 leader_peer.id, task.from_peer.id, task.region_id
300 ),
301 }
302 );
303
304 if task.from_peer.addr.is_empty() {
305 warn!(
306 "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
307 leader_peer, task.region_id
308 );
309 task.from_peer = leader_peer.clone();
311 }
312
313 Ok(())
314 }
315
316 fn verify_region_follower_peers(
318 &self,
319 region_route: &RegionRoute,
320 task: &RegionMigrationProcedureTask,
321 ) -> Result<()> {
322 ensure!(
323 !region_route.follower_peers.contains(&task.to_peer),
324 error::InvalidArgumentsSnafu {
325 err_msg: format!(
326 "The `to_peer`({}) is already has a region follower, region: {}",
327 task.to_peer.id, task.region_id
328 ),
329 },
330 );
331
332 Ok(())
333 }
334
335 pub async fn submit_procedure(
337 &self,
338 mut task: RegionMigrationProcedureTask,
339 ) -> Result<Option<ProcedureId>> {
340 let Some(guard) = self.insert_running_procedure(&task) else {
341 return error::MigrationRunningSnafu {
342 region_id: task.region_id,
343 }
344 .fail();
345 };
346
347 self.verify_task(&task)?;
348
349 let region_id = task.region_id;
350
351 let table_route = self.retrieve_table_route(region_id).await?;
352 self.verify_table_route(&table_route, &task)?;
353
354 let region_route = table_route
356 .region_route(region_id)
357 .context(error::UnexpectedLogicalRouteTableSnafu {
358 err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
359 })?
360 .context(error::RegionRouteNotFoundSnafu { region_id })?;
361
362 if self.has_migrated(®ion_route, &task)? {
363 info!("Skipping region migration task: {task}");
364 return error::RegionMigratedSnafu {
365 region_id,
366 target_peer_id: task.to_peer.id,
367 }
368 .fail();
369 }
370
371 self.verify_region_leader_peer(®ion_route, &mut task)?;
372 self.verify_region_follower_peers(®ion_route, &task)?;
373 let table_info = self.retrieve_table_info(region_id).await?;
374 let TableName {
375 catalog_name,
376 schema_name,
377 ..
378 } = table_info.table_name();
379 let RegionMigrationProcedureTask {
380 region_id,
381 from_peer,
382 to_peer,
383 timeout,
384 trigger_reason,
385 } = task.clone();
386 let procedure = RegionMigrationProcedure::new(
387 PersistentContext {
388 catalog: catalog_name,
389 schema: schema_name,
390 region_id,
391 from_peer,
392 to_peer,
393 timeout,
394 trigger_reason,
395 },
396 self.context_factory.clone(),
397 Some(guard),
398 );
399 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
400 let procedure_id = procedure_with_id.id;
401 info!("Starting region migration procedure {procedure_id} for {task}");
402 let procedure_manager = self.procedure_manager.clone();
403 common_runtime::spawn_global(async move {
404 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
405 Ok(watcher) => watcher,
406 Err(e) => {
407 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
408 return;
409 }
410 };
411 METRIC_META_REGION_MIGRATION_DATANODES
412 .with_label_values(&["src", &task.from_peer.id.to_string()])
413 .inc();
414 METRIC_META_REGION_MIGRATION_DATANODES
415 .with_label_values(&["desc", &task.to_peer.id.to_string()])
416 .inc();
417
418 if let Err(e) = watcher::wait(watcher).await {
419 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
420 METRIC_META_REGION_MIGRATION_FAIL.inc();
421 return;
422 }
423
424 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
425 });
426
427 Ok(Some(procedure_id))
428 }
429}
430
431#[cfg(test)]
432mod test {
433 use std::assert_matches::assert_matches;
434
435 use common_meta::key::table_route::LogicalTableRouteValue;
436 use common_meta::key::test_utils::new_test_table_info;
437 use common_meta::rpc::router::Region;
438
439 use super::*;
440 use crate::procedure::region_migration::test_util::TestingEnv;
441
442 #[tokio::test]
443 async fn test_insert_running_procedure() {
444 let env = TestingEnv::new();
445 let context_factory = env.context_factory();
446 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
447 let region_id = RegionId::new(1024, 1);
448 let task = RegionMigrationProcedureTask {
449 region_id,
450 from_peer: Peer::empty(2),
451 to_peer: Peer::empty(1),
452 timeout: Duration::from_millis(1000),
453 trigger_reason: RegionMigrationTriggerReason::Manual,
454 };
455 manager
457 .tracker
458 .running_procedures
459 .write()
460 .unwrap()
461 .insert(region_id, task.clone());
462
463 let err = manager.submit_procedure(task).await.unwrap_err();
464 assert_matches!(err, error::Error::MigrationRunning { .. });
465 }
466
467 #[tokio::test]
468 async fn test_submit_procedure_invalid_task() {
469 let env = TestingEnv::new();
470 let context_factory = env.context_factory();
471 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
472 let region_id = RegionId::new(1024, 1);
473 let task = RegionMigrationProcedureTask {
474 region_id,
475 from_peer: Peer::empty(1),
476 to_peer: Peer::empty(1),
477 timeout: Duration::from_millis(1000),
478 trigger_reason: RegionMigrationTriggerReason::Manual,
479 };
480
481 let err = manager.submit_procedure(task).await.unwrap_err();
482 assert_matches!(err, error::Error::InvalidArguments { .. });
483 }
484
485 #[tokio::test]
486 async fn test_submit_procedure_table_not_found() {
487 let env = TestingEnv::new();
488 let context_factory = env.context_factory();
489 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
490 let region_id = RegionId::new(1024, 1);
491 let task = RegionMigrationProcedureTask {
492 region_id,
493 from_peer: Peer::empty(1),
494 to_peer: Peer::empty(2),
495 timeout: Duration::from_millis(1000),
496 trigger_reason: RegionMigrationTriggerReason::Manual,
497 };
498
499 let err = manager.submit_procedure(task).await.unwrap_err();
500 assert_matches!(err, error::Error::TableRouteNotFound { .. });
501 }
502
503 #[tokio::test]
504 async fn test_submit_procedure_region_route_not_found() {
505 let env = TestingEnv::new();
506 let context_factory = env.context_factory();
507 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
508 let region_id = RegionId::new(1024, 1);
509 let task = RegionMigrationProcedureTask {
510 region_id,
511 from_peer: Peer::empty(1),
512 to_peer: Peer::empty(2),
513 timeout: Duration::from_millis(1000),
514 trigger_reason: RegionMigrationTriggerReason::Manual,
515 };
516
517 let table_info = new_test_table_info(1024, vec![1]).into();
518 let region_routes = vec![RegionRoute {
519 region: Region::new_test(RegionId::new(1024, 2)),
520 leader_peer: Some(Peer::empty(3)),
521 ..Default::default()
522 }];
523
524 env.create_physical_table_metadata(table_info, region_routes)
525 .await;
526
527 let err = manager.submit_procedure(task).await.unwrap_err();
528 assert_matches!(err, error::Error::RegionRouteNotFound { .. });
529 }
530
531 #[tokio::test]
532 async fn test_submit_procedure_incorrect_from_peer() {
533 let env = TestingEnv::new();
534 let context_factory = env.context_factory();
535 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
536 let region_id = RegionId::new(1024, 1);
537 let task = RegionMigrationProcedureTask {
538 region_id,
539 from_peer: Peer::empty(1),
540 to_peer: Peer::empty(2),
541 timeout: Duration::from_millis(1000),
542 trigger_reason: RegionMigrationTriggerReason::Manual,
543 };
544
545 let table_info = new_test_table_info(1024, vec![1]).into();
546 let region_routes = vec![RegionRoute {
547 region: Region::new_test(RegionId::new(1024, 1)),
548 leader_peer: Some(Peer::empty(3)),
549 ..Default::default()
550 }];
551
552 env.create_physical_table_metadata(table_info, region_routes)
553 .await;
554
555 let err = manager.submit_procedure(task).await.unwrap_err();
556 assert_matches!(err, error::Error::LeaderPeerChanged { .. });
557 assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
558 }
559
560 #[tokio::test]
561 async fn test_submit_procedure_region_follower_on_to_peer() {
562 let env = TestingEnv::new();
563 let context_factory = env.context_factory();
564 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
565 let region_id = RegionId::new(1024, 1);
566 let task = RegionMigrationProcedureTask {
567 region_id,
568 from_peer: Peer::empty(3),
569 to_peer: Peer::empty(2),
570 timeout: Duration::from_millis(1000),
571 trigger_reason: RegionMigrationTriggerReason::Manual,
572 };
573
574 let table_info = new_test_table_info(1024, vec![1]).into();
575 let region_routes = vec![RegionRoute {
576 region: Region::new_test(region_id),
577 leader_peer: Some(Peer::empty(3)),
578 follower_peers: vec![Peer::empty(2)],
579 ..Default::default()
580 }];
581
582 env.create_physical_table_metadata(table_info, region_routes)
583 .await;
584
585 let err = manager.submit_procedure(task).await.unwrap_err();
586 assert_matches!(err, error::Error::InvalidArguments { .. });
587 assert_eq!(
588 err.to_string(),
589 "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
590 );
591 }
592
593 #[tokio::test]
594 async fn test_submit_procedure_has_migrated() {
595 common_telemetry::init_default_ut_logging();
596 let env = TestingEnv::new();
597 let context_factory = env.context_factory();
598 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
599 let region_id = RegionId::new(1024, 1);
600 let task = RegionMigrationProcedureTask {
601 region_id,
602 from_peer: Peer::empty(1),
603 to_peer: Peer::empty(2),
604 timeout: Duration::from_millis(1000),
605 trigger_reason: RegionMigrationTriggerReason::Manual,
606 };
607
608 let table_info = new_test_table_info(1024, vec![1]).into();
609 let region_routes = vec![RegionRoute {
610 region: Region::new_test(RegionId::new(1024, 1)),
611 leader_peer: Some(Peer::empty(2)),
612 ..Default::default()
613 }];
614
615 env.create_physical_table_metadata(table_info, region_routes)
616 .await;
617
618 let err = manager.submit_procedure(task).await.unwrap_err();
619 assert_matches!(err, error::Error::RegionMigrated { .. });
620 }
621
622 #[tokio::test]
623 async fn test_verify_table_route_error() {
624 let env = TestingEnv::new();
625 let context_factory = env.context_factory();
626 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
627 let region_id = RegionId::new(1024, 1);
628 let task = RegionMigrationProcedureTask {
629 region_id,
630 from_peer: Peer::empty(1),
631 to_peer: Peer::empty(2),
632 timeout: Duration::from_millis(1000),
633 trigger_reason: RegionMigrationTriggerReason::Manual,
634 };
635
636 let err = manager
637 .verify_table_route(
638 &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
639 &task,
640 )
641 .unwrap_err();
642
643 assert_matches!(err, error::Error::Unexpected { .. });
644 }
645}