meta_srv/procedure/region_migration/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31/// A migration task describing how regions are intended to move between peers.
32#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34    /// Region ids involved in this migration.
35    pub region_ids: Vec<RegionId>,
36    /// Source peer where regions currently reside.
37    pub from_peer: Peer,
38    /// Destination peer to migrate regions to.
39    pub to_peer: Peer,
40    /// Timeout for migration.
41    pub timeout: Duration,
42    /// Reason why this migration was triggered.
43    pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47    /// Constructs a [`RegionMigrationTaskBatch`] from a vector of region migration procedure tasks.
48    ///
49    /// Aggregates region IDs, determines source and destination peers, sets an appropriate timeout,
50    /// and assigns the trigger reason for the migration batch.
51    ///
52    /// # Panic
53    /// if the `tasks` are empty.
54    pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55        let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56        let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57        let from_peer = tasks[0].0.from_peer.clone();
58        let to_peer = tasks[0].0.to_peer.clone();
59        let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60        let trigger_reason = RegionMigrationTriggerReason::Failover;
61        Self {
62            region_ids,
63            from_peer,
64            to_peer,
65            timeout,
66            trigger_reason,
67        }
68    }
69}
70
71impl Display for RegionMigrationTaskBatch {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76            self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77        )
78    }
79}
80
81impl RegionMigrationTaskBatch {
82    /// Returns the table regions map.
83    ///
84    /// The key is the table id, the value is the region ids of the table.
85    pub(crate) fn table_regions(&self) -> HashMap<TableId, HashSet<RegionId>> {
86        let mut table_regions = HashMap::new();
87        for region_id in &self.region_ids {
88            table_regions
89                .entry(region_id.table_id())
90                .or_insert_with(HashSet::new)
91                .insert(*region_id);
92        }
93        table_regions
94    }
95}
96
97/// Represents the result of analyzing a migration task.
98#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100    /// Regions already migrated to the `to_peer`.
101    pub(crate) migrated: Vec<RegionId>,
102    /// Regions where the leader peer has changed.
103    pub(crate) leader_changed: Vec<RegionId>,
104    /// Regions where `to_peer` is already a follower (conflict).
105    pub(crate) peer_conflict: Vec<RegionId>,
106    /// Regions whose table is not found.
107    pub(crate) table_not_found: Vec<RegionId>,
108    /// Regions whose table exists but region route is not found (e.g., removed after repartition).
109    pub(crate) region_not_found: Vec<RegionId>,
110    /// Regions still pending migration.
111    pub(crate) pending: Vec<RegionId>,
112}
113
114fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
115    region_route
116        .leader_peer
117        .as_ref()
118        .with_context(|| error::UnexpectedSnafu {
119            violated: format!(
120                "Region route leader peer is not found in region({})",
121                region_route.region.id
122            ),
123        })
124}
125
126/// Returns true if the region has already been migrated to `to_peer`.
127fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
128    if region_route.is_leader_downgrading() {
129        return Ok(false);
130    }
131
132    let leader_peer = leader_peer(region_route)?;
133    Ok(leader_peer.id == to_peer_id)
134}
135
136/// Returns true if the leader peer of the region has changed.
137fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
138    let leader_peer = leader_peer(region_route)?;
139
140    Ok(leader_peer.id != from_peer_id)
141}
142
143/// Returns true if `to_peer` is already a follower of the region (conflict).
144fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
145    region_route
146        .follower_peers
147        .iter()
148        .map(|p| p.id)
149        .contains(&to_peer_id)
150}
151
152/// Updates the verification result based on a single region route.
153fn update_result_with_region_route(
154    result: &mut RegionMigrationAnalysis,
155    region_route: &RegionRoute,
156    from_peer_id: u64,
157    to_peer_id: u64,
158) -> Result<()> {
159    if has_migrated(region_route, to_peer_id)? {
160        result.migrated.push(region_route.region.id);
161        return Ok(());
162    }
163    if has_leader_changed(region_route, from_peer_id)? {
164        result.leader_changed.push(region_route.region.id);
165        return Ok(());
166    }
167    if has_peer_conflict(region_route, to_peer_id) {
168        result.peer_conflict.push(region_route.region.id);
169        return Ok(());
170    }
171    result.pending.push(region_route.region.id);
172    Ok(())
173}
174
175/// Analyzes the migration task and categorizes regions by their current state.
176///
177/// Returns a [`RegionMigrationAnalysis`] describing the migration status.
178pub async fn analyze_region_migration_task(
179    task: &RegionMigrationTaskBatch,
180    table_metadata_manager: &TableMetadataManagerRef,
181) -> Result<RegionMigrationAnalysis> {
182    if task.to_peer.id == task.from_peer.id {
183        return error::InvalidArgumentsSnafu {
184            err_msg: format!(
185                "The `from_peer_id`({}) can't equal `to_peer_id`({})",
186                task.from_peer.id, task.to_peer.id
187            ),
188        }
189        .fail();
190    }
191    let table_regions = task.table_regions();
192    let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
193    let mut result = RegionMigrationAnalysis::default();
194
195    let table_routes = table_metadata_manager
196        .table_route_manager()
197        .table_route_storage()
198        .batch_get_with_raw_bytes(&table_ids)
199        .await
200        .context(error::TableMetadataManagerSnafu)?;
201
202    for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
203        let region_ids = table_regions.get(&table_id).unwrap();
204        let Some(table_route) = table_route else {
205            result.table_not_found.extend(region_ids);
206            continue;
207        };
208        // Throws error if the table route is not a physical table route.
209        let region_routes = table_route.region_routes().with_context(|_| {
210            error::UnexpectedLogicalRouteTableSnafu {
211                err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
212            }
213        })?;
214
215        let existing_region_ids = region_routes
216            .iter()
217            .map(|r| r.region.id)
218            .collect::<HashSet<_>>();
219
220        for region_route in region_routes
221            .iter()
222            .filter(|r| region_ids.contains(&r.region.id))
223        {
224            update_result_with_region_route(
225                &mut result,
226                region_route,
227                task.from_peer.id,
228                task.to_peer.id,
229            )?;
230        }
231
232        for region_id in region_ids {
233            if !existing_region_ids.contains(region_id) {
234                result.region_not_found.push(*region_id);
235            }
236        }
237    }
238
239    Ok(result)
240}
241
242#[cfg(test)]
243mod tests {
244
245    use std::assert_matches::assert_matches;
246    use std::sync::Arc;
247    use std::time::Duration;
248
249    use common_meta::key::TableMetadataManager;
250    use common_meta::key::table_route::{
251        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
252    };
253    use common_meta::kv_backend::TxnService;
254    use common_meta::kv_backend::memory::MemoryKvBackend;
255    use common_meta::peer::Peer;
256    use common_meta::rpc::router::{Region, RegionRoute};
257    use store_api::storage::RegionId;
258
259    use crate::error::Error;
260    use crate::procedure::region_migration::RegionMigrationTriggerReason;
261    use crate::procedure::region_migration::utils::{
262        RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
263        update_result_with_region_route,
264    };
265
266    #[test]
267    fn test_update_result_with_region_route() {
268        // The region is already migrated to the to_peer.
269        let mut result = RegionMigrationAnalysis::default();
270        let region_id = RegionId::new(1, 1);
271        let region_route = RegionRoute {
272            region: Region::new_test(region_id),
273            leader_peer: Some(Peer::empty(1)),
274            follower_peers: vec![],
275            leader_state: None,
276            leader_down_since: None,
277        };
278        update_result_with_region_route(&mut result, &region_route, 2, 1).unwrap();
279        assert_eq!(
280            result,
281            RegionMigrationAnalysis {
282                migrated: vec![region_id],
283                ..Default::default()
284            }
285        );
286
287        // Test region leader changed.
288        let mut result = RegionMigrationAnalysis::default();
289        let region_id = RegionId::new(1, 1);
290        let region_route = RegionRoute {
291            region: Region::new_test(region_id),
292            leader_peer: Some(Peer::empty(1)),
293            follower_peers: vec![],
294            leader_state: None,
295            leader_down_since: None,
296        };
297        update_result_with_region_route(&mut result, &region_route, 2, 3).unwrap();
298        assert_eq!(
299            result,
300            RegionMigrationAnalysis {
301                leader_changed: vec![region_id],
302                ..Default::default()
303            }
304        );
305
306        // Test region peer conflict.
307        let mut result = RegionMigrationAnalysis::default();
308        let region_id = RegionId::new(1, 1);
309        let region_route = RegionRoute {
310            region: Region::new_test(region_id),
311            leader_peer: Some(Peer::empty(1)),
312            follower_peers: vec![Peer::empty(2)],
313            leader_state: None,
314            leader_down_since: None,
315        };
316        update_result_with_region_route(&mut result, &region_route, 1, 2).unwrap();
317        assert_eq!(
318            result,
319            RegionMigrationAnalysis {
320                peer_conflict: vec![region_id],
321                ..Default::default()
322            }
323        );
324
325        // Test normal case.
326        let mut result = RegionMigrationAnalysis::default();
327        let region_id = RegionId::new(1, 1);
328        let region_route = RegionRoute {
329            region: Region::new_test(region_id),
330            leader_peer: Some(Peer::empty(1)),
331            follower_peers: vec![],
332            leader_state: None,
333            leader_down_since: None,
334        };
335        update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap();
336        assert_eq!(
337            result,
338            RegionMigrationAnalysis {
339                pending: vec![region_id],
340                ..Default::default()
341            }
342        );
343
344        // Test leader peer not set
345        let mut result = RegionMigrationAnalysis::default();
346        let region_id = RegionId::new(1, 1);
347        let region_route = RegionRoute {
348            region: Region::new_test(region_id),
349            leader_peer: None,
350            follower_peers: vec![],
351            leader_state: None,
352            leader_down_since: None,
353        };
354        let err = update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap_err();
355        assert_matches!(err, Error::Unexpected { .. });
356    }
357
358    #[tokio::test]
359    async fn test_analyze_region_migration_task_invalid_task() {
360        let task = &RegionMigrationTaskBatch {
361            region_ids: vec![RegionId::new(1, 1)],
362            from_peer: Peer::empty(1),
363            to_peer: Peer::empty(1),
364            timeout: Duration::from_millis(1000),
365            trigger_reason: RegionMigrationTriggerReason::Manual,
366        };
367        let kv_backend = Arc::new(MemoryKvBackend::default());
368        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
369        let err = analyze_region_migration_task(task, &table_metadata_manager)
370            .await
371            .unwrap_err();
372        assert_matches!(err, Error::InvalidArguments { .. });
373    }
374
375    #[tokio::test]
376    async fn test_analyze_region_migration_table_not_found() {
377        let task = &RegionMigrationTaskBatch {
378            region_ids: vec![RegionId::new(1, 1)],
379            from_peer: Peer::empty(1),
380            to_peer: Peer::empty(2),
381            timeout: Duration::from_millis(1000),
382            trigger_reason: RegionMigrationTriggerReason::Manual,
383        };
384        let kv_backend = Arc::new(MemoryKvBackend::default());
385        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
386        let result = analyze_region_migration_task(task, &table_metadata_manager)
387            .await
388            .unwrap();
389        assert_eq!(
390            result,
391            RegionMigrationAnalysis {
392                table_not_found: vec![RegionId::new(1, 1)],
393                ..Default::default()
394            }
395        );
396    }
397
398    #[tokio::test]
399    async fn test_analyze_region_migration_unexpected_logical_table() {
400        let kv_backend = Arc::new(MemoryKvBackend::default());
401        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
402        let (txn, _) = table_metadata_manager
403            .table_route_manager()
404            .table_route_storage()
405            .build_create_txn(
406                1024,
407                &TableRouteValue::Logical(LogicalTableRouteValue::new(1024)),
408            )
409            .unwrap();
410        kv_backend.txn(txn).await.unwrap();
411        let task = &RegionMigrationTaskBatch {
412            region_ids: vec![RegionId::new(1024, 1)],
413            from_peer: Peer::empty(1),
414            to_peer: Peer::empty(2),
415            timeout: Duration::from_millis(1000),
416            trigger_reason: RegionMigrationTriggerReason::Manual,
417        };
418        let err = analyze_region_migration_task(task, &table_metadata_manager)
419            .await
420            .unwrap_err();
421        assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
422    }
423
424    #[tokio::test]
425    async fn test_analyze_region_migration_normal_case() {
426        let kv_backend = Arc::new(MemoryKvBackend::default());
427        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
428        let (txn, _) = table_metadata_manager
429            .table_route_manager()
430            .table_route_storage()
431            .build_create_txn(
432                1024,
433                &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
434                    // Already migrated to the to_peer.
435                    RegionRoute {
436                        region: Region::new_test(RegionId::new(1024, 1)),
437                        leader_peer: Some(Peer::empty(2)),
438                        follower_peers: vec![],
439                        leader_state: None,
440                        leader_down_since: None,
441                    },
442                    // Leader peer changed.
443                    RegionRoute {
444                        region: Region::new_test(RegionId::new(1024, 2)),
445                        leader_peer: Some(Peer::empty(3)),
446                        follower_peers: vec![],
447                        leader_state: None,
448                        leader_down_since: None,
449                    },
450                    // Peer conflict.
451                    RegionRoute {
452                        region: Region::new_test(RegionId::new(1024, 3)),
453                        leader_peer: Some(Peer::empty(1)),
454                        follower_peers: vec![Peer::empty(2)],
455                        leader_state: None,
456                        leader_down_since: None,
457                    },
458                    // Normal case.
459                    RegionRoute {
460                        region: Region::new_test(RegionId::new(1024, 4)),
461                        leader_peer: Some(Peer::empty(1)),
462                        follower_peers: vec![],
463                        leader_state: None,
464                        leader_down_since: None,
465                    },
466                ])),
467            )
468            .unwrap();
469
470        kv_backend.txn(txn).await.unwrap();
471        let task = &RegionMigrationTaskBatch {
472            region_ids: vec![
473                RegionId::new(1024, 1),
474                RegionId::new(1024, 2),
475                RegionId::new(1024, 3),
476                RegionId::new(1024, 4),
477                // Region of existing table but route removed (e.g., after repartition).
478                RegionId::new(1024, 5),
479                RegionId::new(1025, 1),
480            ],
481            from_peer: Peer::empty(1),
482            to_peer: Peer::empty(2),
483            timeout: Duration::from_millis(1000),
484            trigger_reason: RegionMigrationTriggerReason::Manual,
485        };
486        let result = analyze_region_migration_task(task, &table_metadata_manager)
487            .await
488            .unwrap();
489        assert_eq!(
490            result,
491            RegionMigrationAnalysis {
492                pending: vec![RegionId::new(1024, 4)],
493                migrated: vec![RegionId::new(1024, 1)],
494                leader_changed: vec![RegionId::new(1024, 2)],
495                peer_conflict: vec![RegionId::new(1024, 3)],
496                region_not_found: vec![RegionId::new(1024, 5)],
497                table_not_found: vec![RegionId::new(1025, 1)],
498            }
499        );
500    }
501}