1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34 pub region_ids: Vec<RegionId>,
36 pub from_peer: Peer,
38 pub to_peer: Peer,
40 pub timeout: Duration,
42 pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47 pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55 let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56 let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57 let from_peer = tasks[0].0.from_peer.clone();
58 let to_peer = tasks[0].0.to_peer.clone();
59 let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60 let trigger_reason = RegionMigrationTriggerReason::Failover;
61 Self {
62 region_ids,
63 from_peer,
64 to_peer,
65 timeout,
66 trigger_reason,
67 }
68 }
69}
70
71impl Display for RegionMigrationTaskBatch {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76 self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77 )
78 }
79}
80
81impl RegionMigrationTaskBatch {
82 pub(crate) fn table_regions(&self) -> HashMap<TableId, HashSet<RegionId>> {
86 let mut table_regions = HashMap::new();
87 for region_id in &self.region_ids {
88 table_regions
89 .entry(region_id.table_id())
90 .or_insert_with(HashSet::new)
91 .insert(*region_id);
92 }
93 table_regions
94 }
95}
96
97#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100 pub(crate) migrated: Vec<RegionId>,
102 pub(crate) leader_changed: Vec<RegionId>,
104 pub(crate) peer_conflict: Vec<RegionId>,
106 pub(crate) table_not_found: Vec<RegionId>,
108 pub(crate) region_not_found: Vec<RegionId>,
110 pub(crate) pending: Vec<RegionId>,
112}
113
114fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
115 region_route
116 .leader_peer
117 .as_ref()
118 .with_context(|| error::UnexpectedSnafu {
119 violated: format!(
120 "Region route leader peer is not found in region({})",
121 region_route.region.id
122 ),
123 })
124}
125
126fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
128 if region_route.is_leader_downgrading() {
129 return Ok(false);
130 }
131
132 let leader_peer = leader_peer(region_route)?;
133 Ok(leader_peer.id == to_peer_id)
134}
135
136fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
138 let leader_peer = leader_peer(region_route)?;
139
140 Ok(leader_peer.id != from_peer_id)
141}
142
143fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
145 region_route
146 .follower_peers
147 .iter()
148 .map(|p| p.id)
149 .contains(&to_peer_id)
150}
151
152fn update_result_with_region_route(
154 result: &mut RegionMigrationAnalysis,
155 region_route: &RegionRoute,
156 from_peer_id: u64,
157 to_peer_id: u64,
158) -> Result<()> {
159 if has_migrated(region_route, to_peer_id)? {
160 result.migrated.push(region_route.region.id);
161 return Ok(());
162 }
163 if has_leader_changed(region_route, from_peer_id)? {
164 result.leader_changed.push(region_route.region.id);
165 return Ok(());
166 }
167 if has_peer_conflict(region_route, to_peer_id) {
168 result.peer_conflict.push(region_route.region.id);
169 return Ok(());
170 }
171 result.pending.push(region_route.region.id);
172 Ok(())
173}
174
175pub async fn analyze_region_migration_task(
179 task: &RegionMigrationTaskBatch,
180 table_metadata_manager: &TableMetadataManagerRef,
181) -> Result<RegionMigrationAnalysis> {
182 if task.to_peer.id == task.from_peer.id {
183 return error::InvalidArgumentsSnafu {
184 err_msg: format!(
185 "The `from_peer_id`({}) can't equal `to_peer_id`({})",
186 task.from_peer.id, task.to_peer.id
187 ),
188 }
189 .fail();
190 }
191 let table_regions = task.table_regions();
192 let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
193 let mut result = RegionMigrationAnalysis::default();
194
195 let table_routes = table_metadata_manager
196 .table_route_manager()
197 .table_route_storage()
198 .batch_get_with_raw_bytes(&table_ids)
199 .await
200 .context(error::TableMetadataManagerSnafu)?;
201
202 for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
203 let region_ids = table_regions.get(&table_id).unwrap();
204 let Some(table_route) = table_route else {
205 result.table_not_found.extend(region_ids);
206 continue;
207 };
208 let region_routes = table_route.region_routes().with_context(|_| {
210 error::UnexpectedLogicalRouteTableSnafu {
211 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
212 }
213 })?;
214
215 let existing_region_ids = region_routes
216 .iter()
217 .map(|r| r.region.id)
218 .collect::<HashSet<_>>();
219
220 for region_route in region_routes
221 .iter()
222 .filter(|r| region_ids.contains(&r.region.id))
223 {
224 update_result_with_region_route(
225 &mut result,
226 region_route,
227 task.from_peer.id,
228 task.to_peer.id,
229 )?;
230 }
231
232 for region_id in region_ids {
233 if !existing_region_ids.contains(region_id) {
234 result.region_not_found.push(*region_id);
235 }
236 }
237 }
238
239 Ok(result)
240}
241
242#[cfg(test)]
243mod tests {
244
245 use std::assert_matches::assert_matches;
246 use std::sync::Arc;
247 use std::time::Duration;
248
249 use common_meta::key::TableMetadataManager;
250 use common_meta::key::table_route::{
251 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
252 };
253 use common_meta::kv_backend::TxnService;
254 use common_meta::kv_backend::memory::MemoryKvBackend;
255 use common_meta::peer::Peer;
256 use common_meta::rpc::router::{Region, RegionRoute};
257 use store_api::storage::RegionId;
258
259 use crate::error::Error;
260 use crate::procedure::region_migration::RegionMigrationTriggerReason;
261 use crate::procedure::region_migration::utils::{
262 RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
263 update_result_with_region_route,
264 };
265
266 #[test]
267 fn test_update_result_with_region_route() {
268 let mut result = RegionMigrationAnalysis::default();
270 let region_id = RegionId::new(1, 1);
271 let region_route = RegionRoute {
272 region: Region::new_test(region_id),
273 leader_peer: Some(Peer::empty(1)),
274 follower_peers: vec![],
275 leader_state: None,
276 leader_down_since: None,
277 };
278 update_result_with_region_route(&mut result, ®ion_route, 2, 1).unwrap();
279 assert_eq!(
280 result,
281 RegionMigrationAnalysis {
282 migrated: vec![region_id],
283 ..Default::default()
284 }
285 );
286
287 let mut result = RegionMigrationAnalysis::default();
289 let region_id = RegionId::new(1, 1);
290 let region_route = RegionRoute {
291 region: Region::new_test(region_id),
292 leader_peer: Some(Peer::empty(1)),
293 follower_peers: vec![],
294 leader_state: None,
295 leader_down_since: None,
296 };
297 update_result_with_region_route(&mut result, ®ion_route, 2, 3).unwrap();
298 assert_eq!(
299 result,
300 RegionMigrationAnalysis {
301 leader_changed: vec![region_id],
302 ..Default::default()
303 }
304 );
305
306 let mut result = RegionMigrationAnalysis::default();
308 let region_id = RegionId::new(1, 1);
309 let region_route = RegionRoute {
310 region: Region::new_test(region_id),
311 leader_peer: Some(Peer::empty(1)),
312 follower_peers: vec![Peer::empty(2)],
313 leader_state: None,
314 leader_down_since: None,
315 };
316 update_result_with_region_route(&mut result, ®ion_route, 1, 2).unwrap();
317 assert_eq!(
318 result,
319 RegionMigrationAnalysis {
320 peer_conflict: vec![region_id],
321 ..Default::default()
322 }
323 );
324
325 let mut result = RegionMigrationAnalysis::default();
327 let region_id = RegionId::new(1, 1);
328 let region_route = RegionRoute {
329 region: Region::new_test(region_id),
330 leader_peer: Some(Peer::empty(1)),
331 follower_peers: vec![],
332 leader_state: None,
333 leader_down_since: None,
334 };
335 update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap();
336 assert_eq!(
337 result,
338 RegionMigrationAnalysis {
339 pending: vec![region_id],
340 ..Default::default()
341 }
342 );
343
344 let mut result = RegionMigrationAnalysis::default();
346 let region_id = RegionId::new(1, 1);
347 let region_route = RegionRoute {
348 region: Region::new_test(region_id),
349 leader_peer: None,
350 follower_peers: vec![],
351 leader_state: None,
352 leader_down_since: None,
353 };
354 let err = update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap_err();
355 assert_matches!(err, Error::Unexpected { .. });
356 }
357
358 #[tokio::test]
359 async fn test_analyze_region_migration_task_invalid_task() {
360 let task = &RegionMigrationTaskBatch {
361 region_ids: vec![RegionId::new(1, 1)],
362 from_peer: Peer::empty(1),
363 to_peer: Peer::empty(1),
364 timeout: Duration::from_millis(1000),
365 trigger_reason: RegionMigrationTriggerReason::Manual,
366 };
367 let kv_backend = Arc::new(MemoryKvBackend::default());
368 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
369 let err = analyze_region_migration_task(task, &table_metadata_manager)
370 .await
371 .unwrap_err();
372 assert_matches!(err, Error::InvalidArguments { .. });
373 }
374
375 #[tokio::test]
376 async fn test_analyze_region_migration_table_not_found() {
377 let task = &RegionMigrationTaskBatch {
378 region_ids: vec![RegionId::new(1, 1)],
379 from_peer: Peer::empty(1),
380 to_peer: Peer::empty(2),
381 timeout: Duration::from_millis(1000),
382 trigger_reason: RegionMigrationTriggerReason::Manual,
383 };
384 let kv_backend = Arc::new(MemoryKvBackend::default());
385 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
386 let result = analyze_region_migration_task(task, &table_metadata_manager)
387 .await
388 .unwrap();
389 assert_eq!(
390 result,
391 RegionMigrationAnalysis {
392 table_not_found: vec![RegionId::new(1, 1)],
393 ..Default::default()
394 }
395 );
396 }
397
398 #[tokio::test]
399 async fn test_analyze_region_migration_unexpected_logical_table() {
400 let kv_backend = Arc::new(MemoryKvBackend::default());
401 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
402 let (txn, _) = table_metadata_manager
403 .table_route_manager()
404 .table_route_storage()
405 .build_create_txn(
406 1024,
407 &TableRouteValue::Logical(LogicalTableRouteValue::new(1024)),
408 )
409 .unwrap();
410 kv_backend.txn(txn).await.unwrap();
411 let task = &RegionMigrationTaskBatch {
412 region_ids: vec![RegionId::new(1024, 1)],
413 from_peer: Peer::empty(1),
414 to_peer: Peer::empty(2),
415 timeout: Duration::from_millis(1000),
416 trigger_reason: RegionMigrationTriggerReason::Manual,
417 };
418 let err = analyze_region_migration_task(task, &table_metadata_manager)
419 .await
420 .unwrap_err();
421 assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
422 }
423
424 #[tokio::test]
425 async fn test_analyze_region_migration_normal_case() {
426 let kv_backend = Arc::new(MemoryKvBackend::default());
427 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
428 let (txn, _) = table_metadata_manager
429 .table_route_manager()
430 .table_route_storage()
431 .build_create_txn(
432 1024,
433 &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
434 RegionRoute {
436 region: Region::new_test(RegionId::new(1024, 1)),
437 leader_peer: Some(Peer::empty(2)),
438 follower_peers: vec![],
439 leader_state: None,
440 leader_down_since: None,
441 },
442 RegionRoute {
444 region: Region::new_test(RegionId::new(1024, 2)),
445 leader_peer: Some(Peer::empty(3)),
446 follower_peers: vec![],
447 leader_state: None,
448 leader_down_since: None,
449 },
450 RegionRoute {
452 region: Region::new_test(RegionId::new(1024, 3)),
453 leader_peer: Some(Peer::empty(1)),
454 follower_peers: vec![Peer::empty(2)],
455 leader_state: None,
456 leader_down_since: None,
457 },
458 RegionRoute {
460 region: Region::new_test(RegionId::new(1024, 4)),
461 leader_peer: Some(Peer::empty(1)),
462 follower_peers: vec![],
463 leader_state: None,
464 leader_down_since: None,
465 },
466 ])),
467 )
468 .unwrap();
469
470 kv_backend.txn(txn).await.unwrap();
471 let task = &RegionMigrationTaskBatch {
472 region_ids: vec![
473 RegionId::new(1024, 1),
474 RegionId::new(1024, 2),
475 RegionId::new(1024, 3),
476 RegionId::new(1024, 4),
477 RegionId::new(1024, 5),
479 RegionId::new(1025, 1),
480 ],
481 from_peer: Peer::empty(1),
482 to_peer: Peer::empty(2),
483 timeout: Duration::from_millis(1000),
484 trigger_reason: RegionMigrationTriggerReason::Manual,
485 };
486 let result = analyze_region_migration_task(task, &table_metadata_manager)
487 .await
488 .unwrap();
489 assert_eq!(
490 result,
491 RegionMigrationAnalysis {
492 pending: vec![RegionId::new(1024, 4)],
493 migrated: vec![RegionId::new(1024, 1)],
494 leader_changed: vec![RegionId::new(1024, 2)],
495 peer_conflict: vec![RegionId::new(1024, 3)],
496 region_not_found: vec![RegionId::new(1024, 5)],
497 table_not_found: vec![RegionId::new(1025, 1)],
498 }
499 );
500 }
501}