meta_srv/procedure/region_migration/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::RegionRoute;
22use itertools::Itertools;
23use snafu::{OptionExt, ResultExt};
24use store_api::storage::{RegionId, TableId};
25
26use crate::error::{self, Result};
27use crate::procedure::region_migration::{
28    DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
29};
30
31/// A migration task describing how regions are intended to move between peers.
32#[derive(Debug, Clone)]
33pub struct RegionMigrationTaskBatch {
34    /// Region ids involved in this migration.
35    pub region_ids: Vec<RegionId>,
36    /// Source peer where regions currently reside.
37    pub from_peer: Peer,
38    /// Destination peer to migrate regions to.
39    pub to_peer: Peer,
40    /// Timeout for migration.
41    pub timeout: Duration,
42    /// Reason why this migration was triggered.
43    pub trigger_reason: RegionMigrationTriggerReason,
44}
45
46impl RegionMigrationTaskBatch {
47    /// Constructs a [`RegionMigrationTaskBatch`] from a vector of region migration procedure tasks.
48    ///
49    /// Aggregates region IDs, determines source and destination peers, sets an appropriate timeout,
50    /// and assigns the trigger reason for the migration batch.
51    ///
52    /// # Panic
53    /// if the `tasks` are empty.
54    pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
55        let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
56        let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
57        let from_peer = tasks[0].0.from_peer.clone();
58        let to_peer = tasks[0].0.to_peer.clone();
59        let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
60        let trigger_reason = RegionMigrationTriggerReason::Failover;
61        Self {
62            region_ids,
63            from_peer,
64            to_peer,
65            timeout,
66            trigger_reason,
67        }
68    }
69}
70
71impl Display for RegionMigrationTaskBatch {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
76            self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
77        )
78    }
79}
80
81impl RegionMigrationTaskBatch {
82    /// Returns the table regions map.
83    ///
84    /// The key is the table id, the value is the region ids of the table.
85    pub(crate) fn table_regions(&self) -> HashMap<TableId, HashSet<RegionId>> {
86        let mut table_regions = HashMap::new();
87        for region_id in &self.region_ids {
88            table_regions
89                .entry(region_id.table_id())
90                .or_insert_with(HashSet::new)
91                .insert(*region_id);
92        }
93        table_regions
94    }
95}
96
97/// Represents the result of analyzing a migration task.
98#[derive(Debug, Clone, Default, PartialEq)]
99pub(crate) struct RegionMigrationAnalysis {
100    /// Regions already migrated to the `to_peer`.
101    pub(crate) migrated: Vec<RegionId>,
102    /// Regions where the leader peer has changed.
103    pub(crate) leader_changed: Vec<RegionId>,
104    /// Regions where `to_peer` is already a follower (conflict).
105    pub(crate) peer_conflict: Vec<RegionId>,
106    /// Regions whose table is not found.
107    pub(crate) table_not_found: Vec<RegionId>,
108    /// Regions whose table exists but region route is not found (e.g., removed after repartition).
109    pub(crate) region_not_found: Vec<RegionId>,
110    /// Regions still pending migration.
111    pub(crate) pending: Vec<RegionId>,
112}
113
114fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
115    region_route
116        .leader_peer
117        .as_ref()
118        .with_context(|| error::UnexpectedSnafu {
119            violated: format!(
120                "Region route leader peer is not found in region({})",
121                region_route.region.id
122            ),
123        })
124}
125
126/// Returns true if the region has already been migrated to `to_peer`.
127fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
128    if region_route.is_leader_downgrading() {
129        return Ok(false);
130    }
131
132    let leader_peer = leader_peer(region_route)?;
133    Ok(leader_peer.id == to_peer_id)
134}
135
136/// Returns true if the leader peer of the region has changed.
137fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
138    let leader_peer = leader_peer(region_route)?;
139
140    Ok(leader_peer.id != from_peer_id)
141}
142
143/// Returns true if `to_peer` is already a follower of the region (conflict).
144fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
145    region_route
146        .follower_peers
147        .iter()
148        .map(|p| p.id)
149        .contains(&to_peer_id)
150}
151
152/// Updates the verification result based on a single region route.
153fn update_result_with_region_route(
154    result: &mut RegionMigrationAnalysis,
155    region_route: &RegionRoute,
156    from_peer_id: u64,
157    to_peer_id: u64,
158) -> Result<()> {
159    if has_migrated(region_route, to_peer_id)? {
160        result.migrated.push(region_route.region.id);
161        return Ok(());
162    }
163    if has_leader_changed(region_route, from_peer_id)? {
164        result.leader_changed.push(region_route.region.id);
165        return Ok(());
166    }
167    if has_peer_conflict(region_route, to_peer_id) {
168        result.peer_conflict.push(region_route.region.id);
169        return Ok(());
170    }
171    result.pending.push(region_route.region.id);
172    Ok(())
173}
174
175/// Analyzes the migration task and categorizes regions by their current state.
176///
177/// Returns a [`RegionMigrationAnalysis`] describing the migration status.
178pub async fn analyze_region_migration_task(
179    task: &RegionMigrationTaskBatch,
180    table_metadata_manager: &TableMetadataManagerRef,
181) -> Result<RegionMigrationAnalysis> {
182    if task.to_peer.id == task.from_peer.id {
183        return error::InvalidArgumentsSnafu {
184            err_msg: format!(
185                "The `from_peer_id`({}) can't equal `to_peer_id`({})",
186                task.from_peer.id, task.to_peer.id
187            ),
188        }
189        .fail();
190    }
191    let table_regions = task.table_regions();
192    let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
193    let mut result = RegionMigrationAnalysis::default();
194
195    let table_routes = table_metadata_manager
196        .table_route_manager()
197        .table_route_storage()
198        .batch_get_with_raw_bytes(&table_ids)
199        .await
200        .context(error::TableMetadataManagerSnafu)?;
201
202    for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
203        let region_ids = table_regions.get(&table_id).unwrap();
204        let Some(table_route) = table_route else {
205            result.table_not_found.extend(region_ids);
206            continue;
207        };
208        // Throws error if the table route is not a physical table route.
209        let region_routes = table_route.region_routes().with_context(|_| {
210            error::UnexpectedLogicalRouteTableSnafu {
211                err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
212            }
213        })?;
214
215        let existing_region_ids = region_routes
216            .iter()
217            .map(|r| r.region.id)
218            .collect::<HashSet<_>>();
219
220        for region_route in region_routes
221            .iter()
222            .filter(|r| region_ids.contains(&r.region.id))
223        {
224            update_result_with_region_route(
225                &mut result,
226                region_route,
227                task.from_peer.id,
228                task.to_peer.id,
229            )?;
230        }
231
232        for region_id in region_ids {
233            if !existing_region_ids.contains(region_id) {
234                result.region_not_found.push(*region_id);
235            }
236        }
237    }
238
239    Ok(result)
240}
241
242#[cfg(test)]
243mod tests {
244
245    use std::assert_matches::assert_matches;
246    use std::sync::Arc;
247    use std::time::Duration;
248
249    use common_meta::key::TableMetadataManager;
250    use common_meta::key::table_route::{
251        LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
252    };
253    use common_meta::kv_backend::TxnService;
254    use common_meta::kv_backend::memory::MemoryKvBackend;
255    use common_meta::peer::Peer;
256    use common_meta::rpc::router::{Region, RegionRoute};
257    use store_api::storage::RegionId;
258
259    use crate::error::Error;
260    use crate::procedure::region_migration::RegionMigrationTriggerReason;
261    use crate::procedure::region_migration::utils::{
262        RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
263        update_result_with_region_route,
264    };
265
266    #[test]
267    fn test_update_result_with_region_route() {
268        // The region is already migrated to the to_peer.
269        let mut result = RegionMigrationAnalysis::default();
270        let region_id = RegionId::new(1, 1);
271        let region_route = RegionRoute {
272            region: Region::new_test(region_id),
273            leader_peer: Some(Peer::empty(1)),
274            follower_peers: vec![],
275            leader_state: None,
276            leader_down_since: None,
277            write_route_policy: None,
278        };
279        update_result_with_region_route(&mut result, &region_route, 2, 1).unwrap();
280        assert_eq!(
281            result,
282            RegionMigrationAnalysis {
283                migrated: vec![region_id],
284                ..Default::default()
285            }
286        );
287
288        // Test region leader changed.
289        let mut result = RegionMigrationAnalysis::default();
290        let region_id = RegionId::new(1, 1);
291        let region_route = RegionRoute {
292            region: Region::new_test(region_id),
293            leader_peer: Some(Peer::empty(1)),
294            follower_peers: vec![],
295            leader_state: None,
296            leader_down_since: None,
297            write_route_policy: None,
298        };
299        update_result_with_region_route(&mut result, &region_route, 2, 3).unwrap();
300        assert_eq!(
301            result,
302            RegionMigrationAnalysis {
303                leader_changed: vec![region_id],
304                ..Default::default()
305            }
306        );
307
308        // Test region peer conflict.
309        let mut result = RegionMigrationAnalysis::default();
310        let region_id = RegionId::new(1, 1);
311        let region_route = RegionRoute {
312            region: Region::new_test(region_id),
313            leader_peer: Some(Peer::empty(1)),
314            follower_peers: vec![Peer::empty(2)],
315            leader_state: None,
316            leader_down_since: None,
317            write_route_policy: None,
318        };
319        update_result_with_region_route(&mut result, &region_route, 1, 2).unwrap();
320        assert_eq!(
321            result,
322            RegionMigrationAnalysis {
323                peer_conflict: vec![region_id],
324                ..Default::default()
325            }
326        );
327
328        // Test normal case.
329        let mut result = RegionMigrationAnalysis::default();
330        let region_id = RegionId::new(1, 1);
331        let region_route = RegionRoute {
332            region: Region::new_test(region_id),
333            leader_peer: Some(Peer::empty(1)),
334            follower_peers: vec![],
335            leader_state: None,
336            leader_down_since: None,
337            write_route_policy: None,
338        };
339        update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap();
340        assert_eq!(
341            result,
342            RegionMigrationAnalysis {
343                pending: vec![region_id],
344                ..Default::default()
345            }
346        );
347
348        // Test leader peer not set
349        let mut result = RegionMigrationAnalysis::default();
350        let region_id = RegionId::new(1, 1);
351        let region_route = RegionRoute {
352            region: Region::new_test(region_id),
353            leader_peer: None,
354            follower_peers: vec![],
355            leader_state: None,
356            leader_down_since: None,
357            write_route_policy: None,
358        };
359        let err = update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap_err();
360        assert_matches!(err, Error::Unexpected { .. });
361    }
362
363    #[tokio::test]
364    async fn test_analyze_region_migration_task_invalid_task() {
365        let task = &RegionMigrationTaskBatch {
366            region_ids: vec![RegionId::new(1, 1)],
367            from_peer: Peer::empty(1),
368            to_peer: Peer::empty(1),
369            timeout: Duration::from_millis(1000),
370            trigger_reason: RegionMigrationTriggerReason::Manual,
371        };
372        let kv_backend = Arc::new(MemoryKvBackend::default());
373        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
374        let err = analyze_region_migration_task(task, &table_metadata_manager)
375            .await
376            .unwrap_err();
377        assert_matches!(err, Error::InvalidArguments { .. });
378    }
379
380    #[tokio::test]
381    async fn test_analyze_region_migration_table_not_found() {
382        let task = &RegionMigrationTaskBatch {
383            region_ids: vec![RegionId::new(1, 1)],
384            from_peer: Peer::empty(1),
385            to_peer: Peer::empty(2),
386            timeout: Duration::from_millis(1000),
387            trigger_reason: RegionMigrationTriggerReason::Manual,
388        };
389        let kv_backend = Arc::new(MemoryKvBackend::default());
390        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
391        let result = analyze_region_migration_task(task, &table_metadata_manager)
392            .await
393            .unwrap();
394        assert_eq!(
395            result,
396            RegionMigrationAnalysis {
397                table_not_found: vec![RegionId::new(1, 1)],
398                ..Default::default()
399            }
400        );
401    }
402
403    #[tokio::test]
404    async fn test_analyze_region_migration_unexpected_logical_table() {
405        let kv_backend = Arc::new(MemoryKvBackend::default());
406        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
407        let (txn, _) = table_metadata_manager
408            .table_route_manager()
409            .table_route_storage()
410            .build_create_txn(
411                1024,
412                &TableRouteValue::Logical(LogicalTableRouteValue::new(1024)),
413            )
414            .unwrap();
415        kv_backend.txn(txn).await.unwrap();
416        let task = &RegionMigrationTaskBatch {
417            region_ids: vec![RegionId::new(1024, 1)],
418            from_peer: Peer::empty(1),
419            to_peer: Peer::empty(2),
420            timeout: Duration::from_millis(1000),
421            trigger_reason: RegionMigrationTriggerReason::Manual,
422        };
423        let err = analyze_region_migration_task(task, &table_metadata_manager)
424            .await
425            .unwrap_err();
426        assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
427    }
428
429    #[tokio::test]
430    async fn test_analyze_region_migration_normal_case() {
431        let kv_backend = Arc::new(MemoryKvBackend::default());
432        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
433        let (txn, _) = table_metadata_manager
434            .table_route_manager()
435            .table_route_storage()
436            .build_create_txn(
437                1024,
438                &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
439                    // Already migrated to the to_peer.
440                    RegionRoute {
441                        region: Region::new_test(RegionId::new(1024, 1)),
442                        leader_peer: Some(Peer::empty(2)),
443                        follower_peers: vec![],
444                        leader_state: None,
445                        leader_down_since: None,
446                        write_route_policy: None,
447                    },
448                    // Leader peer changed.
449                    RegionRoute {
450                        region: Region::new_test(RegionId::new(1024, 2)),
451                        leader_peer: Some(Peer::empty(3)),
452                        follower_peers: vec![],
453                        leader_state: None,
454                        leader_down_since: None,
455                        write_route_policy: None,
456                    },
457                    // Peer conflict.
458                    RegionRoute {
459                        region: Region::new_test(RegionId::new(1024, 3)),
460                        leader_peer: Some(Peer::empty(1)),
461                        follower_peers: vec![Peer::empty(2)],
462                        leader_state: None,
463                        leader_down_since: None,
464                        write_route_policy: None,
465                    },
466                    // Normal case.
467                    RegionRoute {
468                        region: Region::new_test(RegionId::new(1024, 4)),
469                        leader_peer: Some(Peer::empty(1)),
470                        follower_peers: vec![],
471                        leader_state: None,
472                        leader_down_since: None,
473                        write_route_policy: None,
474                    },
475                ])),
476            )
477            .unwrap();
478
479        kv_backend.txn(txn).await.unwrap();
480        let task = &RegionMigrationTaskBatch {
481            region_ids: vec![
482                RegionId::new(1024, 1),
483                RegionId::new(1024, 2),
484                RegionId::new(1024, 3),
485                RegionId::new(1024, 4),
486                // Region of existing table but route removed (e.g., after repartition).
487                RegionId::new(1024, 5),
488                RegionId::new(1025, 1),
489            ],
490            from_peer: Peer::empty(1),
491            to_peer: Peer::empty(2),
492            timeout: Duration::from_millis(1000),
493            trigger_reason: RegionMigrationTriggerReason::Manual,
494        };
495        let result = analyze_region_migration_task(task, &table_metadata_manager)
496            .await
497            .unwrap();
498        assert_eq!(
499            result,
500            RegionMigrationAnalysis {
501                pending: vec![RegionId::new(1024, 4)],
502                migrated: vec![RegionId::new(1024, 1)],
503                leader_changed: vec![RegionId::new(1024, 2)],
504                peer_conflict: vec![RegionId::new(1024, 3)],
505                region_not_found: vec![RegionId::new(1024, 5)],
506                table_not_found: vec![RegionId::new(1025, 1)],
507            }
508        );
509    }
510}