1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34 pub region_ids: Vec<RegionId>,
36 pub from_peer: Peer,
38 pub to_peer: Peer,
40 pub timeout: Duration,
42 pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47 pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55 let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56 let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57 let from_peer = tasks[0].0.from_peer.clone();
58 let to_peer = tasks[0].0.to_peer.clone();
59 let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60 let trigger_reason = RegionMigrationTriggerReason::Failover;
61 Self {
62 region_ids,
63 from_peer,
64 to_peer,
65 timeout,
66 trigger_reason,
67 }
68 }
69}
70
71impl Display for RegionMigrationTaskBatch {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76 self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77 )
78 }
79}
80
81impl RegionMigrationTaskBatch {
82 pub(crate) fn table_regions(&self) -> HashMap<TableId, HashSet<RegionId>> {
86 let mut table_regions = HashMap::new();
87 for region_id in &self.region_ids {
88 table_regions
89 .entry(region_id.table_id())
90 .or_insert_with(HashSet::new)
91 .insert(*region_id);
92 }
93 table_regions
94 }
95}
96
97#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100 pub(crate) migrated: Vec<RegionId>,
102 pub(crate) leader_changed: Vec<RegionId>,
104 pub(crate) peer_conflict: Vec<RegionId>,
106 pub(crate) table_not_found: Vec<RegionId>,
108 pub(crate) region_not_found: Vec<RegionId>,
110 pub(crate) pending: Vec<RegionId>,
112}
113
114fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
115 region_route
116 .leader_peer
117 .as_ref()
118 .with_context(|| error::UnexpectedSnafu {
119 violated: format!(
120 "Region route leader peer is not found in region({})",
121 region_route.region.id
122 ),
123 })
124}
125
126fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
128 if region_route.is_leader_downgrading() {
129 return Ok(false);
130 }
131
132 let leader_peer = leader_peer(region_route)?;
133 Ok(leader_peer.id == to_peer_id)
134}
135
136fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
138 let leader_peer = leader_peer(region_route)?;
139
140 Ok(leader_peer.id != from_peer_id)
141}
142
143fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
145 region_route
146 .follower_peers
147 .iter()
148 .map(|p| p.id)
149 .contains(&to_peer_id)
150}
151
152fn update_result_with_region_route(
154 result: &mut RegionMigrationAnalysis,
155 region_route: &RegionRoute,
156 from_peer_id: u64,
157 to_peer_id: u64,
158) -> Result<()> {
159 if has_migrated(region_route, to_peer_id)? {
160 result.migrated.push(region_route.region.id);
161 return Ok(());
162 }
163 if has_leader_changed(region_route, from_peer_id)? {
164 result.leader_changed.push(region_route.region.id);
165 return Ok(());
166 }
167 if has_peer_conflict(region_route, to_peer_id) {
168 result.peer_conflict.push(region_route.region.id);
169 return Ok(());
170 }
171 result.pending.push(region_route.region.id);
172 Ok(())
173}
174
175pub async fn analyze_region_migration_task(
179 task: &RegionMigrationTaskBatch,
180 table_metadata_manager: &TableMetadataManagerRef,
181) -> Result<RegionMigrationAnalysis> {
182 if task.to_peer.id == task.from_peer.id {
183 return error::InvalidArgumentsSnafu {
184 err_msg: format!(
185 "The `from_peer_id`({}) can't equal `to_peer_id`({})",
186 task.from_peer.id, task.to_peer.id
187 ),
188 }
189 .fail();
190 }
191 let table_regions = task.table_regions();
192 let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
193 let mut result = RegionMigrationAnalysis::default();
194
195 let table_routes = table_metadata_manager
196 .table_route_manager()
197 .table_route_storage()
198 .batch_get_with_raw_bytes(&table_ids)
199 .await
200 .context(error::TableMetadataManagerSnafu)?;
201
202 for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
203 let region_ids = table_regions.get(&table_id).unwrap();
204 let Some(table_route) = table_route else {
205 result.table_not_found.extend(region_ids);
206 continue;
207 };
208 let region_routes = table_route.region_routes().with_context(|_| {
210 error::UnexpectedLogicalRouteTableSnafu {
211 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
212 }
213 })?;
214
215 let existing_region_ids = region_routes
216 .iter()
217 .map(|r| r.region.id)
218 .collect::<HashSet<_>>();
219
220 for region_route in region_routes
221 .iter()
222 .filter(|r| region_ids.contains(&r.region.id))
223 {
224 update_result_with_region_route(
225 &mut result,
226 region_route,
227 task.from_peer.id,
228 task.to_peer.id,
229 )?;
230 }
231
232 for region_id in region_ids {
233 if !existing_region_ids.contains(region_id) {
234 result.region_not_found.push(*region_id);
235 }
236 }
237 }
238
239 Ok(result)
240}
241
242#[cfg(test)]
243mod tests {
244
245 use std::assert_matches::assert_matches;
246 use std::sync::Arc;
247 use std::time::Duration;
248
249 use common_meta::key::TableMetadataManager;
250 use common_meta::key::table_route::{
251 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
252 };
253 use common_meta::kv_backend::TxnService;
254 use common_meta::kv_backend::memory::MemoryKvBackend;
255 use common_meta::peer::Peer;
256 use common_meta::rpc::router::{Region, RegionRoute};
257 use store_api::storage::RegionId;
258
259 use crate::error::Error;
260 use crate::procedure::region_migration::RegionMigrationTriggerReason;
261 use crate::procedure::region_migration::utils::{
262 RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
263 update_result_with_region_route,
264 };
265
266 #[test]
267 fn test_update_result_with_region_route() {
268 let mut result = RegionMigrationAnalysis::default();
270 let region_id = RegionId::new(1, 1);
271 let region_route = RegionRoute {
272 region: Region::new_test(region_id),
273 leader_peer: Some(Peer::empty(1)),
274 follower_peers: vec![],
275 leader_state: None,
276 leader_down_since: None,
277 write_route_policy: None,
278 };
279 update_result_with_region_route(&mut result, ®ion_route, 2, 1).unwrap();
280 assert_eq!(
281 result,
282 RegionMigrationAnalysis {
283 migrated: vec![region_id],
284 ..Default::default()
285 }
286 );
287
288 let mut result = RegionMigrationAnalysis::default();
290 let region_id = RegionId::new(1, 1);
291 let region_route = RegionRoute {
292 region: Region::new_test(region_id),
293 leader_peer: Some(Peer::empty(1)),
294 follower_peers: vec![],
295 leader_state: None,
296 leader_down_since: None,
297 write_route_policy: None,
298 };
299 update_result_with_region_route(&mut result, ®ion_route, 2, 3).unwrap();
300 assert_eq!(
301 result,
302 RegionMigrationAnalysis {
303 leader_changed: vec![region_id],
304 ..Default::default()
305 }
306 );
307
308 let mut result = RegionMigrationAnalysis::default();
310 let region_id = RegionId::new(1, 1);
311 let region_route = RegionRoute {
312 region: Region::new_test(region_id),
313 leader_peer: Some(Peer::empty(1)),
314 follower_peers: vec![Peer::empty(2)],
315 leader_state: None,
316 leader_down_since: None,
317 write_route_policy: None,
318 };
319 update_result_with_region_route(&mut result, ®ion_route, 1, 2).unwrap();
320 assert_eq!(
321 result,
322 RegionMigrationAnalysis {
323 peer_conflict: vec![region_id],
324 ..Default::default()
325 }
326 );
327
328 let mut result = RegionMigrationAnalysis::default();
330 let region_id = RegionId::new(1, 1);
331 let region_route = RegionRoute {
332 region: Region::new_test(region_id),
333 leader_peer: Some(Peer::empty(1)),
334 follower_peers: vec![],
335 leader_state: None,
336 leader_down_since: None,
337 write_route_policy: None,
338 };
339 update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap();
340 assert_eq!(
341 result,
342 RegionMigrationAnalysis {
343 pending: vec![region_id],
344 ..Default::default()
345 }
346 );
347
348 let mut result = RegionMigrationAnalysis::default();
350 let region_id = RegionId::new(1, 1);
351 let region_route = RegionRoute {
352 region: Region::new_test(region_id),
353 leader_peer: None,
354 follower_peers: vec![],
355 leader_state: None,
356 leader_down_since: None,
357 write_route_policy: None,
358 };
359 let err = update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap_err();
360 assert_matches!(err, Error::Unexpected { .. });
361 }
362
363 #[tokio::test]
364 async fn test_analyze_region_migration_task_invalid_task() {
365 let task = &RegionMigrationTaskBatch {
366 region_ids: vec![RegionId::new(1, 1)],
367 from_peer: Peer::empty(1),
368 to_peer: Peer::empty(1),
369 timeout: Duration::from_millis(1000),
370 trigger_reason: RegionMigrationTriggerReason::Manual,
371 };
372 let kv_backend = Arc::new(MemoryKvBackend::default());
373 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
374 let err = analyze_region_migration_task(task, &table_metadata_manager)
375 .await
376 .unwrap_err();
377 assert_matches!(err, Error::InvalidArguments { .. });
378 }
379
380 #[tokio::test]
381 async fn test_analyze_region_migration_table_not_found() {
382 let task = &RegionMigrationTaskBatch {
383 region_ids: vec![RegionId::new(1, 1)],
384 from_peer: Peer::empty(1),
385 to_peer: Peer::empty(2),
386 timeout: Duration::from_millis(1000),
387 trigger_reason: RegionMigrationTriggerReason::Manual,
388 };
389 let kv_backend = Arc::new(MemoryKvBackend::default());
390 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
391 let result = analyze_region_migration_task(task, &table_metadata_manager)
392 .await
393 .unwrap();
394 assert_eq!(
395 result,
396 RegionMigrationAnalysis {
397 table_not_found: vec![RegionId::new(1, 1)],
398 ..Default::default()
399 }
400 );
401 }
402
403 #[tokio::test]
404 async fn test_analyze_region_migration_unexpected_logical_table() {
405 let kv_backend = Arc::new(MemoryKvBackend::default());
406 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
407 let (txn, _) = table_metadata_manager
408 .table_route_manager()
409 .table_route_storage()
410 .build_create_txn(
411 1024,
412 &TableRouteValue::Logical(LogicalTableRouteValue::new(1024)),
413 )
414 .unwrap();
415 kv_backend.txn(txn).await.unwrap();
416 let task = &RegionMigrationTaskBatch {
417 region_ids: vec![RegionId::new(1024, 1)],
418 from_peer: Peer::empty(1),
419 to_peer: Peer::empty(2),
420 timeout: Duration::from_millis(1000),
421 trigger_reason: RegionMigrationTriggerReason::Manual,
422 };
423 let err = analyze_region_migration_task(task, &table_metadata_manager)
424 .await
425 .unwrap_err();
426 assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
427 }
428
429 #[tokio::test]
430 async fn test_analyze_region_migration_normal_case() {
431 let kv_backend = Arc::new(MemoryKvBackend::default());
432 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
433 let (txn, _) = table_metadata_manager
434 .table_route_manager()
435 .table_route_storage()
436 .build_create_txn(
437 1024,
438 &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
439 RegionRoute {
441 region: Region::new_test(RegionId::new(1024, 1)),
442 leader_peer: Some(Peer::empty(2)),
443 follower_peers: vec![],
444 leader_state: None,
445 leader_down_since: None,
446 write_route_policy: None,
447 },
448 RegionRoute {
450 region: Region::new_test(RegionId::new(1024, 2)),
451 leader_peer: Some(Peer::empty(3)),
452 follower_peers: vec![],
453 leader_state: None,
454 leader_down_since: None,
455 write_route_policy: None,
456 },
457 RegionRoute {
459 region: Region::new_test(RegionId::new(1024, 3)),
460 leader_peer: Some(Peer::empty(1)),
461 follower_peers: vec![Peer::empty(2)],
462 leader_state: None,
463 leader_down_since: None,
464 write_route_policy: None,
465 },
466 RegionRoute {
468 region: Region::new_test(RegionId::new(1024, 4)),
469 leader_peer: Some(Peer::empty(1)),
470 follower_peers: vec![],
471 leader_state: None,
472 leader_down_since: None,
473 write_route_policy: None,
474 },
475 ])),
476 )
477 .unwrap();
478
479 kv_backend.txn(txn).await.unwrap();
480 let task = &RegionMigrationTaskBatch {
481 region_ids: vec![
482 RegionId::new(1024, 1),
483 RegionId::new(1024, 2),
484 RegionId::new(1024, 3),
485 RegionId::new(1024, 4),
486 RegionId::new(1024, 5),
488 RegionId::new(1025, 1),
489 ],
490 from_peer: Peer::empty(1),
491 to_peer: Peer::empty(2),
492 timeout: Duration::from_millis(1000),
493 trigger_reason: RegionMigrationTriggerReason::Manual,
494 };
495 let result = analyze_region_migration_task(task, &table_metadata_manager)
496 .await
497 .unwrap();
498 assert_eq!(
499 result,
500 RegionMigrationAnalysis {
501 pending: vec![RegionId::new(1024, 4)],
502 migrated: vec![RegionId::new(1024, 1)],
503 leader_changed: vec![RegionId::new(1024, 2)],
504 peer_conflict: vec![RegionId::new(1024, 3)],
505 region_not_found: vec![RegionId::new(1024, 5)],
506 table_not_found: vec![RegionId::new(1025, 1)],
507 }
508 );
509 }
510}