1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
26use common_telemetry::{error, info};
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::storage::RegionId;
29use table::table_name::TableName;
30
31use crate::error::{self, Result};
32use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
33use crate::procedure::region_migration::{
34 DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
35};
36
37pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
38
39pub struct RegionMigrationManager {
41 procedure_manager: ProcedureManagerRef,
42 context_factory: DefaultContextFactory,
43 tracker: RegionMigrationProcedureTracker,
44}
45
46#[derive(Default, Clone)]
47pub struct RegionMigrationProcedureTracker {
48 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
49}
50
51impl RegionMigrationProcedureTracker {
52 pub(crate) fn insert_running_procedure(
54 &self,
55 task: &RegionMigrationProcedureTask,
56 ) -> Option<RegionMigrationProcedureGuard> {
57 let mut procedures = self.running_procedures.write().unwrap();
58 match procedures.entry(task.region_id) {
59 Entry::Occupied(_) => None,
60 Entry::Vacant(v) => {
61 v.insert(task.clone());
62 Some(RegionMigrationProcedureGuard {
63 region_id: task.region_id,
64 running_procedures: self.running_procedures.clone(),
65 })
66 }
67 }
68 }
69
70 pub(crate) fn contains(&self, region_id: RegionId) -> bool {
72 self.running_procedures
73 .read()
74 .unwrap()
75 .contains_key(®ion_id)
76 }
77}
78
79pub(crate) struct RegionMigrationProcedureGuard {
81 region_id: RegionId,
82 running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
83}
84
85impl Drop for RegionMigrationProcedureGuard {
86 fn drop(&mut self) {
87 let exists = self
88 .running_procedures
89 .read()
90 .unwrap()
91 .contains_key(&self.region_id);
92 if exists {
93 self.running_procedures
94 .write()
95 .unwrap()
96 .remove(&self.region_id);
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
102pub struct RegionMigrationProcedureTask {
103 pub(crate) region_id: RegionId,
104 pub(crate) from_peer: Peer,
105 pub(crate) to_peer: Peer,
106 pub(crate) timeout: Duration,
107}
108
109impl RegionMigrationProcedureTask {
110 pub fn new(region_id: RegionId, from_peer: Peer, to_peer: Peer, timeout: Duration) -> Self {
111 Self {
112 region_id,
113 from_peer,
114 to_peer,
115 timeout,
116 }
117 }
118}
119
120impl Display for RegionMigrationProcedureTask {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 write!(
123 f,
124 "region: {}, from_peer: {}, to_peer: {}",
125 self.region_id, self.from_peer, self.to_peer
126 )
127 }
128}
129
130impl RegionMigrationManager {
131 pub(crate) fn new(
133 procedure_manager: ProcedureManagerRef,
134 context_factory: DefaultContextFactory,
135 ) -> Self {
136 Self {
137 procedure_manager,
138 context_factory,
139 tracker: RegionMigrationProcedureTracker::default(),
140 }
141 }
142
143 pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
145 &self.tracker
146 }
147
148 pub(crate) fn try_start(&self) -> Result<()> {
150 let context_factory = self.context_factory.clone();
151 let tracker = self.tracker.clone();
152 self.procedure_manager
153 .register_loader(
154 RegionMigrationProcedure::TYPE_NAME,
155 Box::new(move |json| {
156 let context_factory = context_factory.clone();
157 let tracker = tracker.clone();
158 RegionMigrationProcedure::from_json(json, context_factory, tracker)
159 .map(|p| Box::new(p) as _)
160 }),
161 )
162 .context(error::RegisterProcedureLoaderSnafu {
163 type_name: RegionMigrationProcedure::TYPE_NAME,
164 })
165 }
166
167 fn insert_running_procedure(
168 &self,
169 task: &RegionMigrationProcedureTask,
170 ) -> Option<RegionMigrationProcedureGuard> {
171 self.tracker.insert_running_procedure(task)
172 }
173
174 fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
175 if task.to_peer.id == task.from_peer.id {
176 return error::InvalidArgumentsSnafu {
177 err_msg: "The `from_peer_id` can't equal `to_peer_id`",
178 }
179 .fail();
180 }
181
182 Ok(())
183 }
184
185 async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
186 let table_route = self
187 .context_factory
188 .table_metadata_manager
189 .table_route_manager()
190 .table_route_storage()
191 .get(region_id.table_id())
192 .await
193 .context(error::TableMetadataManagerSnafu)?
194 .context(error::TableRouteNotFoundSnafu {
195 table_id: region_id.table_id(),
196 })?;
197
198 Ok(table_route)
199 }
200
201 async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
202 let table_route = self
203 .context_factory
204 .table_metadata_manager
205 .table_info_manager()
206 .get(region_id.table_id())
207 .await
208 .context(error::TableMetadataManagerSnafu)?
209 .context(error::TableInfoNotFoundSnafu {
210 table_id: region_id.table_id(),
211 })?
212 .into_inner();
213
214 Ok(table_route)
215 }
216
217 fn verify_table_route(
219 &self,
220 table_route: &TableRouteValue,
221 task: &RegionMigrationProcedureTask,
222 ) -> Result<()> {
223 if !table_route.is_physical() {
224 return error::UnexpectedSnafu {
225 violated: format!(
226 "Trying to execute region migration on the logical table, task {task}"
227 ),
228 }
229 .fail();
230 }
231
232 Ok(())
233 }
234
235 fn has_migrated(
237 &self,
238 region_route: &RegionRoute,
239 task: &RegionMigrationProcedureTask,
240 ) -> Result<bool> {
241 if region_route.is_leader_downgrading() {
242 return Ok(false);
243 }
244
245 let leader_peer = region_route
246 .leader_peer
247 .as_ref()
248 .context(error::UnexpectedSnafu {
249 violated: "Region route leader peer is not found",
250 })?;
251
252 Ok(leader_peer.id == task.to_peer.id)
253 }
254
255 fn verify_region_leader_peer(
257 &self,
258 region_route: &RegionRoute,
259 task: &RegionMigrationProcedureTask,
260 ) -> Result<()> {
261 let leader_peer = region_route
262 .leader_peer
263 .as_ref()
264 .context(error::UnexpectedSnafu {
265 violated: "Region route leader peer is not found",
266 })?;
267
268 ensure!(
269 leader_peer.id == task.from_peer.id,
270 error::LeaderPeerChangedSnafu {
271 msg: format!(
272 "Region's leader peer({}) is not the `from_peer`({}), region: {}",
273 leader_peer.id, task.from_peer.id, task.region_id
274 ),
275 }
276 );
277
278 Ok(())
279 }
280
281 fn verify_region_follower_peers(
283 &self,
284 region_route: &RegionRoute,
285 task: &RegionMigrationProcedureTask,
286 ) -> Result<()> {
287 ensure!(
288 !region_route.follower_peers.contains(&task.to_peer),
289 error::InvalidArgumentsSnafu {
290 err_msg: format!(
291 "The `to_peer`({}) is already has a region follower, region: {}",
292 task.to_peer.id, task.region_id
293 ),
294 },
295 );
296
297 Ok(())
298 }
299
300 pub async fn submit_procedure(
302 &self,
303 task: RegionMigrationProcedureTask,
304 ) -> Result<Option<ProcedureId>> {
305 let Some(guard) = self.insert_running_procedure(&task) else {
306 return error::MigrationRunningSnafu {
307 region_id: task.region_id,
308 }
309 .fail();
310 };
311
312 self.verify_task(&task)?;
313
314 let region_id = task.region_id;
315
316 let table_route = self.retrieve_table_route(region_id).await?;
317 self.verify_table_route(&table_route, &task)?;
318
319 let region_route = table_route
321 .region_route(region_id)
322 .context(error::UnexpectedLogicalRouteTableSnafu {
323 err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
324 })?
325 .context(error::RegionRouteNotFoundSnafu { region_id })?;
326
327 if self.has_migrated(®ion_route, &task)? {
328 info!("Skipping region migration task: {task}");
329 return Ok(None);
330 }
331
332 self.verify_region_leader_peer(®ion_route, &task)?;
333 self.verify_region_follower_peers(®ion_route, &task)?;
334 let table_info = self.retrieve_table_info(region_id).await?;
335 let TableName {
336 catalog_name,
337 schema_name,
338 ..
339 } = table_info.table_name();
340 METRIC_META_REGION_MIGRATION_DATANODES
341 .with_label_values(&["src", &task.from_peer.id.to_string()])
342 .inc();
343 METRIC_META_REGION_MIGRATION_DATANODES
344 .with_label_values(&["desc", &task.to_peer.id.to_string()])
345 .inc();
346 let RegionMigrationProcedureTask {
347 region_id,
348 from_peer,
349 to_peer,
350 timeout,
351 } = task.clone();
352 let procedure = RegionMigrationProcedure::new(
353 PersistentContext {
354 catalog: catalog_name,
355 schema: schema_name,
356 region_id,
357 from_peer,
358 to_peer,
359 timeout,
360 },
361 self.context_factory.clone(),
362 Some(guard),
363 );
364 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
365 let procedure_id = procedure_with_id.id;
366 info!("Starting region migration procedure {procedure_id} for {task}");
367 let procedure_manager = self.procedure_manager.clone();
368 common_runtime::spawn_global(async move {
369 let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
370 Ok(watcher) => watcher,
371 Err(e) => {
372 error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
373 return;
374 }
375 };
376
377 if let Err(e) = watcher::wait(watcher).await {
378 error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
379 METRIC_META_REGION_MIGRATION_FAIL.inc();
380 return;
381 }
382
383 info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
384 });
385
386 Ok(Some(procedure_id))
387 }
388}
389
390#[cfg(test)]
391mod test {
392 use std::assert_matches::assert_matches;
393
394 use common_meta::key::table_route::LogicalTableRouteValue;
395 use common_meta::key::test_utils::new_test_table_info;
396 use common_meta::rpc::router::Region;
397
398 use super::*;
399 use crate::procedure::region_migration::test_util::TestingEnv;
400
401 #[tokio::test]
402 async fn test_insert_running_procedure() {
403 let env = TestingEnv::new();
404 let context_factory = env.context_factory();
405 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
406 let region_id = RegionId::new(1024, 1);
407 let task = RegionMigrationProcedureTask {
408 region_id,
409 from_peer: Peer::empty(2),
410 to_peer: Peer::empty(1),
411 timeout: Duration::from_millis(1000),
412 };
413 manager
415 .tracker
416 .running_procedures
417 .write()
418 .unwrap()
419 .insert(region_id, task.clone());
420
421 let err = manager.submit_procedure(task).await.unwrap_err();
422 assert_matches!(err, error::Error::MigrationRunning { .. });
423 }
424
425 #[tokio::test]
426 async fn test_submit_procedure_invalid_task() {
427 let env = TestingEnv::new();
428 let context_factory = env.context_factory();
429 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
430 let region_id = RegionId::new(1024, 1);
431 let task = RegionMigrationProcedureTask {
432 region_id,
433 from_peer: Peer::empty(1),
434 to_peer: Peer::empty(1),
435 timeout: Duration::from_millis(1000),
436 };
437
438 let err = manager.submit_procedure(task).await.unwrap_err();
439 assert_matches!(err, error::Error::InvalidArguments { .. });
440 }
441
442 #[tokio::test]
443 async fn test_submit_procedure_table_not_found() {
444 let env = TestingEnv::new();
445 let context_factory = env.context_factory();
446 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
447 let region_id = RegionId::new(1024, 1);
448 let task = RegionMigrationProcedureTask {
449 region_id,
450 from_peer: Peer::empty(1),
451 to_peer: Peer::empty(2),
452 timeout: Duration::from_millis(1000),
453 };
454
455 let err = manager.submit_procedure(task).await.unwrap_err();
456 assert_matches!(err, error::Error::TableRouteNotFound { .. });
457 }
458
459 #[tokio::test]
460 async fn test_submit_procedure_region_route_not_found() {
461 let env = TestingEnv::new();
462 let context_factory = env.context_factory();
463 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
464 let region_id = RegionId::new(1024, 1);
465 let task = RegionMigrationProcedureTask {
466 region_id,
467 from_peer: Peer::empty(1),
468 to_peer: Peer::empty(2),
469 timeout: Duration::from_millis(1000),
470 };
471
472 let table_info = new_test_table_info(1024, vec![1]).into();
473 let region_routes = vec![RegionRoute {
474 region: Region::new_test(RegionId::new(1024, 2)),
475 leader_peer: Some(Peer::empty(3)),
476 ..Default::default()
477 }];
478
479 env.create_physical_table_metadata(table_info, region_routes)
480 .await;
481
482 let err = manager.submit_procedure(task).await.unwrap_err();
483 assert_matches!(err, error::Error::RegionRouteNotFound { .. });
484 }
485
486 #[tokio::test]
487 async fn test_submit_procedure_incorrect_from_peer() {
488 let env = TestingEnv::new();
489 let context_factory = env.context_factory();
490 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
491 let region_id = RegionId::new(1024, 1);
492 let task = RegionMigrationProcedureTask {
493 region_id,
494 from_peer: Peer::empty(1),
495 to_peer: Peer::empty(2),
496 timeout: Duration::from_millis(1000),
497 };
498
499 let table_info = new_test_table_info(1024, vec![1]).into();
500 let region_routes = vec![RegionRoute {
501 region: Region::new_test(RegionId::new(1024, 1)),
502 leader_peer: Some(Peer::empty(3)),
503 ..Default::default()
504 }];
505
506 env.create_physical_table_metadata(table_info, region_routes)
507 .await;
508
509 let err = manager.submit_procedure(task).await.unwrap_err();
510 assert_matches!(err, error::Error::LeaderPeerChanged { .. });
511 assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
512 }
513
514 #[tokio::test]
515 async fn test_submit_procedure_region_follower_on_to_peer() {
516 let env = TestingEnv::new();
517 let context_factory = env.context_factory();
518 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
519 let region_id = RegionId::new(1024, 1);
520 let task = RegionMigrationProcedureTask {
521 region_id,
522 from_peer: Peer::empty(3),
523 to_peer: Peer::empty(2),
524 timeout: Duration::from_millis(1000),
525 };
526
527 let table_info = new_test_table_info(1024, vec![1]).into();
528 let region_routes = vec![RegionRoute {
529 region: Region::new_test(region_id),
530 leader_peer: Some(Peer::empty(3)),
531 follower_peers: vec![Peer::empty(2)],
532 ..Default::default()
533 }];
534
535 env.create_physical_table_metadata(table_info, region_routes)
536 .await;
537
538 let err = manager.submit_procedure(task).await.unwrap_err();
539 assert_matches!(err, error::Error::InvalidArguments { .. });
540 assert_eq!(
541 err.to_string(),
542 "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
543 );
544 }
545
546 #[tokio::test]
547 async fn test_submit_procedure_has_migrated() {
548 common_telemetry::init_default_ut_logging();
549 let env = TestingEnv::new();
550 let context_factory = env.context_factory();
551 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
552 let region_id = RegionId::new(1024, 1);
553 let task = RegionMigrationProcedureTask {
554 region_id,
555 from_peer: Peer::empty(1),
556 to_peer: Peer::empty(2),
557 timeout: Duration::from_millis(1000),
558 };
559
560 let table_info = new_test_table_info(1024, vec![1]).into();
561 let region_routes = vec![RegionRoute {
562 region: Region::new_test(RegionId::new(1024, 1)),
563 leader_peer: Some(Peer::empty(2)),
564 ..Default::default()
565 }];
566
567 env.create_physical_table_metadata(table_info, region_routes)
568 .await;
569
570 manager.submit_procedure(task).await.unwrap();
571 }
572
573 #[tokio::test]
574 async fn test_verify_table_route_error() {
575 let env = TestingEnv::new();
576 let context_factory = env.context_factory();
577 let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
578 let region_id = RegionId::new(1024, 1);
579 let task = RegionMigrationProcedureTask {
580 region_id,
581 from_peer: Peer::empty(1),
582 to_peer: Peer::empty(2),
583 timeout: Duration::from_millis(1000),
584 };
585
586 let err = manager
587 .verify_table_route(
588 &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
589 &task,
590 )
591 .unwrap_err();
592
593 assert_matches!(err, error::Error::Unexpected { .. });
594 }
595}