meta_srv/procedure/region_migration/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31/// A migration task describing how regions are intended to move between peers.
32#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34    /// Region ids involved in this migration.
35    pub region_ids: Vec<RegionId>,
36    /// Source peer where regions currently reside.
37    pub from_peer: Peer,
38    /// Destination peer to migrate regions to.
39    pub to_peer: Peer,
40    /// Timeout for migration.
41    pub timeout: Duration,
42    /// Reason why this migration was triggered.
43    pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47    /// Constructs a [`RegionMigrationTaskBatch`] from a vector of region migration procedure tasks.
48    ///
49    /// Aggregates region IDs, determines source and destination peers, sets an appropriate timeout,
50    /// and assigns the trigger reason for the migration batch.
51    ///
52    /// # Panic
53    /// if the `tasks` are empty.
54    pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55        let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56        let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57        let from_peer = tasks[0].0.from_peer.clone();
58        let to_peer = tasks[0].0.to_peer.clone();
59        let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60        let trigger_reason = RegionMigrationTriggerReason::Failover;
61        Self {
62            region_ids,
63            from_peer,
64            to_peer,
65            timeout,
66            trigger_reason,
67        }
68    }
69}
70
71impl Display for RegionMigrationTaskBatch {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76            self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77        )
78    }
79}
80
81impl RegionMigrationTaskBatch {
82    /// Returns the table regions map.
83    ///
84    /// The key is the table id, the value is the region ids of the table.
85    pub(crate) fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
86        let mut table_regions = HashMap::new();
87        for region_id in &self.region_ids {
88            table_regions
89                .entry(region_id.table_id())
90                .or_insert_with(Vec::new)
91                .push(*region_id);
92        }
93        table_regions
94    }
95}
96
97/// Represents the result of analyzing a migration task.
98#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100    /// Regions already migrated to the `to_peer`.
101    pub(crate) migrated: Vec<RegionId>,
102    /// Regions where the leader peer has changed.
103    pub(crate) leader_changed: Vec<RegionId>,
104    /// Regions where `to_peer` is already a follower (conflict).
105    pub(crate) peer_conflict: Vec<RegionId>,
106    /// Regions whose table is not found.
107    pub(crate) table_not_found: Vec<RegionId>,
108    /// Regions still pending migration.
109    pub(crate) pending: Vec<RegionId>,
110}
111
112fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
113    region_route
114        .leader_peer
115        .as_ref()
116        .with_context(|| error::UnexpectedSnafu {
117            violated: format!(
118                "Region route leader peer is not found in region({})",
119                region_route.region.id
120            ),
121        })
122}
123
124/// Returns true if the region has already been migrated to `to_peer`.
125fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
126    if region_route.is_leader_downgrading() {
127        return Ok(false);
128    }
129
130    let leader_peer = leader_peer(region_route)?;
131    Ok(leader_peer.id == to_peer_id)
132}
133
134/// Returns true if the leader peer of the region has changed.
135fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
136    let leader_peer = leader_peer(region_route)?;
137
138    Ok(leader_peer.id != from_peer_id)
139}
140
141/// Returns true if `to_peer` is already a follower of the region (conflict).
142fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
143    region_route
144        .follower_peers
145        .iter()
146        .map(|p| p.id)
147        .contains(&to_peer_id)
148}
149
150/// Updates the verification result based on a single region route.
151fn update_result_with_region_route(
152    result: &mut RegionMigrationAnalysis,
153    region_route: &RegionRoute,
154    from_peer_id: u64,
155    to_peer_id: u64,
156) -> Result<()> {
157    if has_migrated(region_route, to_peer_id)? {
158        result.migrated.push(region_route.region.id);
159        return Ok(());
160    }
161    if has_leader_changed(region_route, from_peer_id)? {
162        result.leader_changed.push(region_route.region.id);
163        return Ok(());
164    }
165    if has_peer_conflict(region_route, to_peer_id) {
166        result.peer_conflict.push(region_route.region.id);
167        return Ok(());
168    }
169    result.pending.push(region_route.region.id);
170    Ok(())
171}
172
173/// Analyzes the migration task and categorizes regions by their current state.
174///
175/// Returns a [`RegionMigrationAnalysis`] describing the migration status.
176pub async fn analyze_region_migration_task(
177    task: &RegionMigrationTaskBatch,
178    table_metadata_manager: &TableMetadataManagerRef,
179) -> Result<RegionMigrationAnalysis> {
180    if task.to_peer.id == task.from_peer.id {
181        return error::InvalidArgumentsSnafu {
182            err_msg: format!(
183                "The `from_peer_id`({}) can't equal `to_peer_id`({})",
184                task.from_peer.id, task.to_peer.id
185            ),
186        }
187        .fail();
188    }
189    let table_regions = task.table_regions();
190    let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
191    let mut result = RegionMigrationAnalysis::default();
192
193    let table_routes = table_metadata_manager
194        .table_route_manager()
195        .table_route_storage()
196        .batch_get_with_raw_bytes(&table_ids)
197        .await
198        .context(error::TableMetadataManagerSnafu)?;
199
200    for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
201        let region_ids = table_regions.get(&table_id).unwrap();
202        let Some(table_route) = table_route else {
203            result.table_not_found.extend(region_ids);
204            continue;
205        };
206        // Throws error if the table route is not a physical table route.
207        let region_routes = table_route.region_routes().with_context(|_| {
208            error::UnexpectedLogicalRouteTableSnafu {
209                err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
210            }
211        })?;
212        for region_route in region_routes
213            .iter()
214            .filter(|r| region_ids.contains(&r.region.id))
215        {
216            update_result_with_region_route(
217                &mut result,
218                region_route,
219                task.from_peer.id,
220                task.to_peer.id,
221            )?;
222        }
223    }
224
225    Ok(result)
226}
227
228#[cfg(test)]
229mod tests {
230
231    use std::assert_matches::assert_matches;
232    use std::sync::Arc;
233    use std::time::Duration;
234
235    use common_meta::key::TableMetadataManager;
236    use common_meta::key::table_route::{
237        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
238    };
239    use common_meta::kv_backend::TxnService;
240    use common_meta::kv_backend::memory::MemoryKvBackend;
241    use common_meta::peer::Peer;
242    use common_meta::rpc::router::{Region, RegionRoute};
243    use store_api::storage::RegionId;
244
245    use crate::error::Error;
246    use crate::procedure::region_migration::RegionMigrationTriggerReason;
247    use crate::procedure::region_migration::utils::{
248        RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
249        update_result_with_region_route,
250    };
251
252    #[test]
253    fn test_update_result_with_region_route() {
254        // The region is already migrated to the to_peer.
255        let mut result = RegionMigrationAnalysis::default();
256        let region_id = RegionId::new(1, 1);
257        let region_route = RegionRoute {
258            region: Region::new_test(region_id),
259            leader_peer: Some(Peer::empty(1)),
260            follower_peers: vec![],
261            leader_state: None,
262            leader_down_since: None,
263        };
264        update_result_with_region_route(&mut result, &region_route, 2, 1).unwrap();
265        assert_eq!(
266            result,
267            RegionMigrationAnalysis {
268                migrated: vec![region_id],
269                ..Default::default()
270            }
271        );
272
273        // Test region leader changed.
274        let mut result = RegionMigrationAnalysis::default();
275        let region_id = RegionId::new(1, 1);
276        let region_route = RegionRoute {
277            region: Region::new_test(region_id),
278            leader_peer: Some(Peer::empty(1)),
279            follower_peers: vec![],
280            leader_state: None,
281            leader_down_since: None,
282        };
283        update_result_with_region_route(&mut result, &region_route, 2, 3).unwrap();
284        assert_eq!(
285            result,
286            RegionMigrationAnalysis {
287                leader_changed: vec![region_id],
288                ..Default::default()
289            }
290        );
291
292        // Test region peer conflict.
293        let mut result = RegionMigrationAnalysis::default();
294        let region_id = RegionId::new(1, 1);
295        let region_route = RegionRoute {
296            region: Region::new_test(region_id),
297            leader_peer: Some(Peer::empty(1)),
298            follower_peers: vec![Peer::empty(2)],
299            leader_state: None,
300            leader_down_since: None,
301        };
302        update_result_with_region_route(&mut result, &region_route, 1, 2).unwrap();
303        assert_eq!(
304            result,
305            RegionMigrationAnalysis {
306                peer_conflict: vec![region_id],
307                ..Default::default()
308            }
309        );
310
311        // Test normal case.
312        let mut result = RegionMigrationAnalysis::default();
313        let region_id = RegionId::new(1, 1);
314        let region_route = RegionRoute {
315            region: Region::new_test(region_id),
316            leader_peer: Some(Peer::empty(1)),
317            follower_peers: vec![],
318            leader_state: None,
319            leader_down_since: None,
320        };
321        update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap();
322        assert_eq!(
323            result,
324            RegionMigrationAnalysis {
325                pending: vec![region_id],
326                ..Default::default()
327            }
328        );
329
330        // Test leader peer not set
331        let mut result = RegionMigrationAnalysis::default();
332        let region_id = RegionId::new(1, 1);
333        let region_route = RegionRoute {
334            region: Region::new_test(region_id),
335            leader_peer: None,
336            follower_peers: vec![],
337            leader_state: None,
338            leader_down_since: None,
339        };
340        let err = update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap_err();
341        assert_matches!(err, Error::Unexpected { .. });
342    }
343
344    #[tokio::test]
345    async fn test_analyze_region_migration_task_invalid_task() {
346        let task = &RegionMigrationTaskBatch {
347            region_ids: vec![RegionId::new(1, 1)],
348            from_peer: Peer::empty(1),
349            to_peer: Peer::empty(1),
350            timeout: Duration::from_millis(1000),
351            trigger_reason: RegionMigrationTriggerReason::Manual,
352        };
353        let kv_backend = Arc::new(MemoryKvBackend::default());
354        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
355        let err = analyze_region_migration_task(task, &table_metadata_manager)
356            .await
357            .unwrap_err();
358        assert_matches!(err, Error::InvalidArguments { .. });
359    }
360
361    #[tokio::test]
362    async fn test_analyze_region_migration_table_not_found() {
363        let task = &RegionMigrationTaskBatch {
364            region_ids: vec![RegionId::new(1, 1)],
365            from_peer: Peer::empty(1),
366            to_peer: Peer::empty(2),
367            timeout: Duration::from_millis(1000),
368            trigger_reason: RegionMigrationTriggerReason::Manual,
369        };
370        let kv_backend = Arc::new(MemoryKvBackend::default());
371        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
372        let result = analyze_region_migration_task(task, &table_metadata_manager)
373            .await
374            .unwrap();
375        assert_eq!(
376            result,
377            RegionMigrationAnalysis {
378                table_not_found: vec![RegionId::new(1, 1)],
379                ..Default::default()
380            }
381        );
382    }
383
384    #[tokio::test]
385    async fn test_analyze_region_migration_unexpected_logical_table() {
386        let kv_backend = Arc::new(MemoryKvBackend::default());
387        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
388        let (txn, _) = table_metadata_manager
389            .table_route_manager()
390            .table_route_storage()
391            .build_create_txn(
392                1024,
393                &TableRouteValue::Logical(LogicalTableRouteValue::new(
394                    1024,
395                    vec![RegionId::new(1023, 1)],
396                )),
397            )
398            .unwrap();
399        kv_backend.txn(txn).await.unwrap();
400        let task = &RegionMigrationTaskBatch {
401            region_ids: vec![RegionId::new(1024, 1)],
402            from_peer: Peer::empty(1),
403            to_peer: Peer::empty(2),
404            timeout: Duration::from_millis(1000),
405            trigger_reason: RegionMigrationTriggerReason::Manual,
406        };
407        let err = analyze_region_migration_task(task, &table_metadata_manager)
408            .await
409            .unwrap_err();
410        assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
411    }
412
413    #[tokio::test]
414    async fn test_analyze_region_migration_normal_case() {
415        let kv_backend = Arc::new(MemoryKvBackend::default());
416        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
417        let (txn, _) = table_metadata_manager
418            .table_route_manager()
419            .table_route_storage()
420            .build_create_txn(
421                1024,
422                &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
423                    // Already migrated to the to_peer.
424                    RegionRoute {
425                        region: Region::new_test(RegionId::new(1024, 1)),
426                        leader_peer: Some(Peer::empty(2)),
427                        follower_peers: vec![],
428                        leader_state: None,
429                        leader_down_since: None,
430                    },
431                    // Leader peer changed.
432                    RegionRoute {
433                        region: Region::new_test(RegionId::new(1024, 2)),
434                        leader_peer: Some(Peer::empty(3)),
435                        follower_peers: vec![],
436                        leader_state: None,
437                        leader_down_since: None,
438                    },
439                    // Peer conflict.
440                    RegionRoute {
441                        region: Region::new_test(RegionId::new(1024, 3)),
442                        leader_peer: Some(Peer::empty(1)),
443                        follower_peers: vec![Peer::empty(2)],
444                        leader_state: None,
445                        leader_down_since: None,
446                    },
447                    // Normal case.
448                    RegionRoute {
449                        region: Region::new_test(RegionId::new(1024, 4)),
450                        leader_peer: Some(Peer::empty(1)),
451                        follower_peers: vec![],
452                        leader_state: None,
453                        leader_down_since: None,
454                    },
455                ])),
456            )
457            .unwrap();
458
459        kv_backend.txn(txn).await.unwrap();
460        let task = &RegionMigrationTaskBatch {
461            region_ids: vec![
462                RegionId::new(1024, 1),
463                RegionId::new(1024, 2),
464                RegionId::new(1024, 3),
465                RegionId::new(1024, 4),
466                RegionId::new(1025, 1),
467            ],
468            from_peer: Peer::empty(1),
469            to_peer: Peer::empty(2),
470            timeout: Duration::from_millis(1000),
471            trigger_reason: RegionMigrationTriggerReason::Manual,
472        };
473        let result = analyze_region_migration_task(task, &table_metadata_manager)
474            .await
475            .unwrap();
476        assert_eq!(
477            result,
478            RegionMigrationAnalysis {
479                pending: vec![RegionId::new(1024, 4)],
480                migrated: vec![RegionId::new(1024, 1)],
481                leader_changed: vec![RegionId::new(1024, 2)],
482                peer_conflict: vec![RegionId::new(1024, 3)],
483                table_not_found: vec![RegionId::new(1025, 1)],
484            }
485        );
486    }
487}