1use std::collections::HashMap;
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34 pub region_ids: Vec<RegionId>,
36 pub from_peer: Peer,
38 pub to_peer: Peer,
40 pub timeout: Duration,
42 pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47 pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55 let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56 let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57 let from_peer = tasks[0].0.from_peer.clone();
58 let to_peer = tasks[0].0.to_peer.clone();
59 let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60 let trigger_reason = RegionMigrationTriggerReason::Failover;
61 Self {
62 region_ids,
63 from_peer,
64 to_peer,
65 timeout,
66 trigger_reason,
67 }
68 }
69}
70
71impl Display for RegionMigrationTaskBatch {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76 self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77 )
78 }
79}
80
81impl RegionMigrationTaskBatch {
82 pub(crate) fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
86 let mut table_regions = HashMap::new();
87 for region_id in &self.region_ids {
88 table_regions
89 .entry(region_id.table_id())
90 .or_insert_with(Vec::new)
91 .push(*region_id);
92 }
93 table_regions
94 }
95}
96
97#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100 pub(crate) migrated: Vec<RegionId>,
102 pub(crate) leader_changed: Vec<RegionId>,
104 pub(crate) peer_conflict: Vec<RegionId>,
106 pub(crate) table_not_found: Vec<RegionId>,
108 pub(crate) pending: Vec<RegionId>,
110}
111
112fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
113 region_route
114 .leader_peer
115 .as_ref()
116 .with_context(|| error::UnexpectedSnafu {
117 violated: format!(
118 "Region route leader peer is not found in region({})",
119 region_route.region.id
120 ),
121 })
122}
123
124fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
126 if region_route.is_leader_downgrading() {
127 return Ok(false);
128 }
129
130 let leader_peer = leader_peer(region_route)?;
131 Ok(leader_peer.id == to_peer_id)
132}
133
134fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
136 let leader_peer = leader_peer(region_route)?;
137
138 Ok(leader_peer.id != from_peer_id)
139}
140
141fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
143 region_route
144 .follower_peers
145 .iter()
146 .map(|p| p.id)
147 .contains(&to_peer_id)
148}
149
150fn update_result_with_region_route(
152 result: &mut RegionMigrationAnalysis,
153 region_route: &RegionRoute,
154 from_peer_id: u64,
155 to_peer_id: u64,
156) -> Result<()> {
157 if has_migrated(region_route, to_peer_id)? {
158 result.migrated.push(region_route.region.id);
159 return Ok(());
160 }
161 if has_leader_changed(region_route, from_peer_id)? {
162 result.leader_changed.push(region_route.region.id);
163 return Ok(());
164 }
165 if has_peer_conflict(region_route, to_peer_id) {
166 result.peer_conflict.push(region_route.region.id);
167 return Ok(());
168 }
169 result.pending.push(region_route.region.id);
170 Ok(())
171}
172
173pub async fn analyze_region_migration_task(
177 task: &RegionMigrationTaskBatch,
178 table_metadata_manager: &TableMetadataManagerRef,
179) -> Result<RegionMigrationAnalysis> {
180 if task.to_peer.id == task.from_peer.id {
181 return error::InvalidArgumentsSnafu {
182 err_msg: format!(
183 "The `from_peer_id`({}) can't equal `to_peer_id`({})",
184 task.from_peer.id, task.to_peer.id
185 ),
186 }
187 .fail();
188 }
189 let table_regions = task.table_regions();
190 let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
191 let mut result = RegionMigrationAnalysis::default();
192
193 let table_routes = table_metadata_manager
194 .table_route_manager()
195 .table_route_storage()
196 .batch_get_with_raw_bytes(&table_ids)
197 .await
198 .context(error::TableMetadataManagerSnafu)?;
199
200 for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
201 let region_ids = table_regions.get(&table_id).unwrap();
202 let Some(table_route) = table_route else {
203 result.table_not_found.extend(region_ids);
204 continue;
205 };
206 let region_routes = table_route.region_routes().with_context(|_| {
208 error::UnexpectedLogicalRouteTableSnafu {
209 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
210 }
211 })?;
212 for region_route in region_routes
213 .iter()
214 .filter(|r| region_ids.contains(&r.region.id))
215 {
216 update_result_with_region_route(
217 &mut result,
218 region_route,
219 task.from_peer.id,
220 task.to_peer.id,
221 )?;
222 }
223 }
224
225 Ok(result)
226}
227
228#[cfg(test)]
229mod tests {
230
231 use std::assert_matches::assert_matches;
232 use std::sync::Arc;
233 use std::time::Duration;
234
235 use common_meta::key::TableMetadataManager;
236 use common_meta::key::table_route::{
237 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
238 };
239 use common_meta::kv_backend::TxnService;
240 use common_meta::kv_backend::memory::MemoryKvBackend;
241 use common_meta::peer::Peer;
242 use common_meta::rpc::router::{Region, RegionRoute};
243 use store_api::storage::RegionId;
244
245 use crate::error::Error;
246 use crate::procedure::region_migration::RegionMigrationTriggerReason;
247 use crate::procedure::region_migration::utils::{
248 RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
249 update_result_with_region_route,
250 };
251
252 #[test]
253 fn test_update_result_with_region_route() {
254 let mut result = RegionMigrationAnalysis::default();
256 let region_id = RegionId::new(1, 1);
257 let region_route = RegionRoute {
258 region: Region::new_test(region_id),
259 leader_peer: Some(Peer::empty(1)),
260 follower_peers: vec![],
261 leader_state: None,
262 leader_down_since: None,
263 };
264 update_result_with_region_route(&mut result, ®ion_route, 2, 1).unwrap();
265 assert_eq!(
266 result,
267 RegionMigrationAnalysis {
268 migrated: vec![region_id],
269 ..Default::default()
270 }
271 );
272
273 let mut result = RegionMigrationAnalysis::default();
275 let region_id = RegionId::new(1, 1);
276 let region_route = RegionRoute {
277 region: Region::new_test(region_id),
278 leader_peer: Some(Peer::empty(1)),
279 follower_peers: vec![],
280 leader_state: None,
281 leader_down_since: None,
282 };
283 update_result_with_region_route(&mut result, ®ion_route, 2, 3).unwrap();
284 assert_eq!(
285 result,
286 RegionMigrationAnalysis {
287 leader_changed: vec![region_id],
288 ..Default::default()
289 }
290 );
291
292 let mut result = RegionMigrationAnalysis::default();
294 let region_id = RegionId::new(1, 1);
295 let region_route = RegionRoute {
296 region: Region::new_test(region_id),
297 leader_peer: Some(Peer::empty(1)),
298 follower_peers: vec![Peer::empty(2)],
299 leader_state: None,
300 leader_down_since: None,
301 };
302 update_result_with_region_route(&mut result, ®ion_route, 1, 2).unwrap();
303 assert_eq!(
304 result,
305 RegionMigrationAnalysis {
306 peer_conflict: vec![region_id],
307 ..Default::default()
308 }
309 );
310
311 let mut result = RegionMigrationAnalysis::default();
313 let region_id = RegionId::new(1, 1);
314 let region_route = RegionRoute {
315 region: Region::new_test(region_id),
316 leader_peer: Some(Peer::empty(1)),
317 follower_peers: vec![],
318 leader_state: None,
319 leader_down_since: None,
320 };
321 update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap();
322 assert_eq!(
323 result,
324 RegionMigrationAnalysis {
325 pending: vec![region_id],
326 ..Default::default()
327 }
328 );
329
330 let mut result = RegionMigrationAnalysis::default();
332 let region_id = RegionId::new(1, 1);
333 let region_route = RegionRoute {
334 region: Region::new_test(region_id),
335 leader_peer: None,
336 follower_peers: vec![],
337 leader_state: None,
338 leader_down_since: None,
339 };
340 let err = update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap_err();
341 assert_matches!(err, Error::Unexpected { .. });
342 }
343
344 #[tokio::test]
345 async fn test_analyze_region_migration_task_invalid_task() {
346 let task = &RegionMigrationTaskBatch {
347 region_ids: vec![RegionId::new(1, 1)],
348 from_peer: Peer::empty(1),
349 to_peer: Peer::empty(1),
350 timeout: Duration::from_millis(1000),
351 trigger_reason: RegionMigrationTriggerReason::Manual,
352 };
353 let kv_backend = Arc::new(MemoryKvBackend::default());
354 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
355 let err = analyze_region_migration_task(task, &table_metadata_manager)
356 .await
357 .unwrap_err();
358 assert_matches!(err, Error::InvalidArguments { .. });
359 }
360
361 #[tokio::test]
362 async fn test_analyze_region_migration_table_not_found() {
363 let task = &RegionMigrationTaskBatch {
364 region_ids: vec![RegionId::new(1, 1)],
365 from_peer: Peer::empty(1),
366 to_peer: Peer::empty(2),
367 timeout: Duration::from_millis(1000),
368 trigger_reason: RegionMigrationTriggerReason::Manual,
369 };
370 let kv_backend = Arc::new(MemoryKvBackend::default());
371 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
372 let result = analyze_region_migration_task(task, &table_metadata_manager)
373 .await
374 .unwrap();
375 assert_eq!(
376 result,
377 RegionMigrationAnalysis {
378 table_not_found: vec![RegionId::new(1, 1)],
379 ..Default::default()
380 }
381 );
382 }
383
384 #[tokio::test]
385 async fn test_analyze_region_migration_unexpected_logical_table() {
386 let kv_backend = Arc::new(MemoryKvBackend::default());
387 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
388 let (txn, _) = table_metadata_manager
389 .table_route_manager()
390 .table_route_storage()
391 .build_create_txn(
392 1024,
393 &TableRouteValue::Logical(LogicalTableRouteValue::new(
394 1024,
395 vec![RegionId::new(1023, 1)],
396 )),
397 )
398 .unwrap();
399 kv_backend.txn(txn).await.unwrap();
400 let task = &RegionMigrationTaskBatch {
401 region_ids: vec![RegionId::new(1024, 1)],
402 from_peer: Peer::empty(1),
403 to_peer: Peer::empty(2),
404 timeout: Duration::from_millis(1000),
405 trigger_reason: RegionMigrationTriggerReason::Manual,
406 };
407 let err = analyze_region_migration_task(task, &table_metadata_manager)
408 .await
409 .unwrap_err();
410 assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
411 }
412
413 #[tokio::test]
414 async fn test_analyze_region_migration_normal_case() {
415 let kv_backend = Arc::new(MemoryKvBackend::default());
416 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
417 let (txn, _) = table_metadata_manager
418 .table_route_manager()
419 .table_route_storage()
420 .build_create_txn(
421 1024,
422 &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
423 RegionRoute {
425 region: Region::new_test(RegionId::new(1024, 1)),
426 leader_peer: Some(Peer::empty(2)),
427 follower_peers: vec![],
428 leader_state: None,
429 leader_down_since: None,
430 },
431 RegionRoute {
433 region: Region::new_test(RegionId::new(1024, 2)),
434 leader_peer: Some(Peer::empty(3)),
435 follower_peers: vec![],
436 leader_state: None,
437 leader_down_since: None,
438 },
439 RegionRoute {
441 region: Region::new_test(RegionId::new(1024, 3)),
442 leader_peer: Some(Peer::empty(1)),
443 follower_peers: vec![Peer::empty(2)],
444 leader_state: None,
445 leader_down_since: None,
446 },
447 RegionRoute {
449 region: Region::new_test(RegionId::new(1024, 4)),
450 leader_peer: Some(Peer::empty(1)),
451 follower_peers: vec![],
452 leader_state: None,
453 leader_down_since: None,
454 },
455 ])),
456 )
457 .unwrap();
458
459 kv_backend.txn(txn).await.unwrap();
460 let task = &RegionMigrationTaskBatch {
461 region_ids: vec![
462 RegionId::new(1024, 1),
463 RegionId::new(1024, 2),
464 RegionId::new(1024, 3),
465 RegionId::new(1024, 4),
466 RegionId::new(1025, 1),
467 ],
468 from_peer: Peer::empty(1),
469 to_peer: Peer::empty(2),
470 timeout: Duration::from_millis(1000),
471 trigger_reason: RegionMigrationTriggerReason::Manual,
472 };
473 let result = analyze_region_migration_task(task, &table_metadata_manager)
474 .await
475 .unwrap();
476 assert_eq!(
477 result,
478 RegionMigrationAnalysis {
479 pending: vec![RegionId::new(1024, 4)],
480 migrated: vec![RegionId::new(1024, 1)],
481 leader_changed: vec![RegionId::new(1024, 2)],
482 peer_conflict: vec![RegionId::new(1024, 3)],
483 table_not_found: vec![RegionId::new(1025, 1)],
484 }
485 );
486 }
487}