meta_srv/procedure/region_migration/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
26use common_telemetry::{error, info, warn};
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, OptionExt, ResultExt};
29use store_api::storage::RegionId;
30use table::table_name::TableName;
31
32use crate::error::{self, Result};
33use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
34use crate::procedure::region_migration::{
35    DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
36};
37
38pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
39
40/// Manager of region migration procedure.
41pub struct RegionMigrationManager {
42    procedure_manager: ProcedureManagerRef,
43    context_factory: DefaultContextFactory,
44    tracker: RegionMigrationProcedureTracker,
45}
46
47#[derive(Default, Clone)]
48pub struct RegionMigrationProcedureTracker {
49    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
50}
51
52impl RegionMigrationProcedureTracker {
53    /// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating.
54    pub(crate) fn insert_running_procedure(
55        &self,
56        task: &RegionMigrationProcedureTask,
57    ) -> Option<RegionMigrationProcedureGuard> {
58        let mut procedures = self.running_procedures.write().unwrap();
59        match procedures.entry(task.region_id) {
60            Entry::Occupied(_) => None,
61            Entry::Vacant(v) => {
62                v.insert(task.clone());
63                Some(RegionMigrationProcedureGuard {
64                    region_id: task.region_id,
65                    running_procedures: self.running_procedures.clone(),
66                })
67            }
68        }
69    }
70
71    /// Returns true if it contains the specific region(`region_id`).
72    pub(crate) fn contains(&self, region_id: RegionId) -> bool {
73        self.running_procedures
74            .read()
75            .unwrap()
76            .contains_key(&region_id)
77    }
78}
79
80/// The guard of running [RegionMigrationProcedureTask].
81pub(crate) struct RegionMigrationProcedureGuard {
82    region_id: RegionId,
83    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
84}
85
86impl Drop for RegionMigrationProcedureGuard {
87    fn drop(&mut self) {
88        let exists = self
89            .running_procedures
90            .read()
91            .unwrap()
92            .contains_key(&self.region_id);
93        if exists {
94            self.running_procedures
95                .write()
96                .unwrap()
97                .remove(&self.region_id);
98        }
99    }
100}
101
102#[derive(Debug, Clone)]
103pub struct RegionMigrationProcedureTask {
104    pub(crate) region_id: RegionId,
105    pub(crate) from_peer: Peer,
106    pub(crate) to_peer: Peer,
107    pub(crate) timeout: Duration,
108    pub(crate) trigger_reason: RegionMigrationTriggerReason,
109}
110
111/// The reason why the region migration procedure is triggered.
112#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
113#[strum(serialize_all = "PascalCase")]
114pub enum RegionMigrationTriggerReason {
115    #[default]
116    /// The region migration procedure is triggered by unknown reason.
117    Unknown,
118    /// The region migration procedure is triggered by administrator.
119    Manual,
120    /// The region migration procedure is triggered by auto rebalance.
121    AutoRebalance,
122    /// The region migration procedure is triggered by failover.
123    Failover,
124}
125
126impl RegionMigrationProcedureTask {
127    pub fn new(
128        region_id: RegionId,
129        from_peer: Peer,
130        to_peer: Peer,
131        timeout: Duration,
132        trigger_reason: RegionMigrationTriggerReason,
133    ) -> Self {
134        Self {
135            region_id,
136            from_peer,
137            to_peer,
138            timeout,
139            trigger_reason,
140        }
141    }
142}
143
144impl Display for RegionMigrationProcedureTask {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        write!(
147            f,
148            "region: {}, from_peer: {}, to_peer: {}, trigger_reason: {}",
149            self.region_id, self.from_peer, self.to_peer, self.trigger_reason
150        )
151    }
152}
153
154impl RegionMigrationManager {
155    /// Returns new [`RegionMigrationManager`]
156    pub(crate) fn new(
157        procedure_manager: ProcedureManagerRef,
158        context_factory: DefaultContextFactory,
159    ) -> Self {
160        Self {
161            procedure_manager,
162            context_factory,
163            tracker: RegionMigrationProcedureTracker::default(),
164        }
165    }
166
167    /// Returns the [`RegionMigrationProcedureTracker`].
168    pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
169        &self.tracker
170    }
171
172    /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
173    pub(crate) fn try_start(&self) -> Result<()> {
174        let context_factory = self.context_factory.clone();
175        let tracker = self.tracker.clone();
176        self.procedure_manager
177            .register_loader(
178                RegionMigrationProcedure::TYPE_NAME,
179                Box::new(move |json| {
180                    let context_factory = context_factory.clone();
181                    let tracker = tracker.clone();
182                    RegionMigrationProcedure::from_json(json, context_factory, tracker)
183                        .map(|p| Box::new(p) as _)
184                }),
185            )
186            .context(error::RegisterProcedureLoaderSnafu {
187                type_name: RegionMigrationProcedure::TYPE_NAME,
188            })
189    }
190
191    fn insert_running_procedure(
192        &self,
193        task: &RegionMigrationProcedureTask,
194    ) -> Option<RegionMigrationProcedureGuard> {
195        self.tracker.insert_running_procedure(task)
196    }
197
198    fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
199        if task.to_peer.id == task.from_peer.id {
200            return error::InvalidArgumentsSnafu {
201                err_msg: "The `from_peer_id` can't equal `to_peer_id`",
202            }
203            .fail();
204        }
205
206        Ok(())
207    }
208
209    async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
210        let table_route = self
211            .context_factory
212            .table_metadata_manager
213            .table_route_manager()
214            .table_route_storage()
215            .get(region_id.table_id())
216            .await
217            .context(error::TableMetadataManagerSnafu)?
218            .context(error::TableRouteNotFoundSnafu {
219                table_id: region_id.table_id(),
220            })?;
221
222        Ok(table_route)
223    }
224
225    async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
226        let table_route = self
227            .context_factory
228            .table_metadata_manager
229            .table_info_manager()
230            .get(region_id.table_id())
231            .await
232            .context(error::TableMetadataManagerSnafu)?
233            .context(error::TableInfoNotFoundSnafu {
234                table_id: region_id.table_id(),
235            })?
236            .into_inner();
237
238        Ok(table_route)
239    }
240
241    /// Verifies the type of region migration table route.
242    fn verify_table_route(
243        &self,
244        table_route: &TableRouteValue,
245        task: &RegionMigrationProcedureTask,
246    ) -> Result<()> {
247        if !table_route.is_physical() {
248            return error::UnexpectedSnafu {
249                violated: format!(
250                    "Trying to execute region migration on the logical table, task {task}"
251                ),
252            }
253            .fail();
254        }
255
256        Ok(())
257    }
258
259    /// Returns true if the region has been migrated.
260    fn has_migrated(
261        &self,
262        region_route: &RegionRoute,
263        task: &RegionMigrationProcedureTask,
264    ) -> Result<bool> {
265        if region_route.is_leader_downgrading() {
266            return Ok(false);
267        }
268
269        let leader_peer = region_route
270            .leader_peer
271            .as_ref()
272            .context(error::UnexpectedSnafu {
273                violated: "Region route leader peer is not found",
274            })?;
275
276        Ok(leader_peer.id == task.to_peer.id)
277    }
278
279    /// Throws an error if `leader_peer` is not the `from_peer`.
280    ///
281    /// If `from_peer` is unknown, use the leader peer as the `from_peer`.
282    fn verify_region_leader_peer(
283        &self,
284        region_route: &RegionRoute,
285        task: &mut RegionMigrationProcedureTask,
286    ) -> Result<()> {
287        let leader_peer = region_route
288            .leader_peer
289            .as_ref()
290            .context(error::UnexpectedSnafu {
291                violated: "Region route leader peer is not found",
292            })?;
293
294        ensure!(
295            leader_peer.id == task.from_peer.id,
296            error::LeaderPeerChangedSnafu {
297                msg: format!(
298                    "Region's leader peer({}) is not the `from_peer`({}), region: {}",
299                    leader_peer.id, task.from_peer.id, task.region_id
300                ),
301            }
302        );
303
304        if task.from_peer.addr.is_empty() {
305            warn!(
306                "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
307                leader_peer, task.region_id
308            );
309            // The peer id is the same as the leader peer id.
310            task.from_peer = leader_peer.clone();
311        }
312
313        Ok(())
314    }
315
316    /// Throws an error if `to_peer` is already has a region follower.
317    fn verify_region_follower_peers(
318        &self,
319        region_route: &RegionRoute,
320        task: &RegionMigrationProcedureTask,
321    ) -> Result<()> {
322        ensure!(
323            !region_route.follower_peers.contains(&task.to_peer),
324            error::InvalidArgumentsSnafu {
325                err_msg: format!(
326                    "The `to_peer`({}) is already has a region follower, region: {}",
327                    task.to_peer.id, task.region_id
328                ),
329            },
330        );
331
332        Ok(())
333    }
334
335    /// Submits a new region migration procedure.
336    pub async fn submit_procedure(
337        &self,
338        mut task: RegionMigrationProcedureTask,
339    ) -> Result<Option<ProcedureId>> {
340        let Some(guard) = self.insert_running_procedure(&task) else {
341            return error::MigrationRunningSnafu {
342                region_id: task.region_id,
343            }
344            .fail();
345        };
346
347        self.verify_task(&task)?;
348
349        let region_id = task.region_id;
350
351        let table_route = self.retrieve_table_route(region_id).await?;
352        self.verify_table_route(&table_route, &task)?;
353
354        // Safety: checked before.
355        let region_route = table_route
356            .region_route(region_id)
357            .context(error::UnexpectedLogicalRouteTableSnafu {
358                err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
359            })?
360            .context(error::RegionRouteNotFoundSnafu { region_id })?;
361
362        if self.has_migrated(&region_route, &task)? {
363            info!("Skipping region migration task: {task}");
364            return error::RegionMigratedSnafu {
365                region_id,
366                target_peer_id: task.to_peer.id,
367            }
368            .fail();
369        }
370
371        self.verify_region_leader_peer(&region_route, &mut task)?;
372        self.verify_region_follower_peers(&region_route, &task)?;
373        let table_info = self.retrieve_table_info(region_id).await?;
374        let TableName {
375            catalog_name,
376            schema_name,
377            ..
378        } = table_info.table_name();
379        let RegionMigrationProcedureTask {
380            region_id,
381            from_peer,
382            to_peer,
383            timeout,
384            trigger_reason,
385        } = task.clone();
386        let procedure = RegionMigrationProcedure::new(
387            PersistentContext {
388                catalog: catalog_name,
389                schema: schema_name,
390                region_id,
391                from_peer,
392                to_peer,
393                timeout,
394                trigger_reason,
395            },
396            self.context_factory.clone(),
397            Some(guard),
398        );
399        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
400        let procedure_id = procedure_with_id.id;
401        info!("Starting region migration procedure {procedure_id} for {task}");
402        let procedure_manager = self.procedure_manager.clone();
403        common_runtime::spawn_global(async move {
404            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
405                Ok(watcher) => watcher,
406                Err(e) => {
407                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
408                    return;
409                }
410            };
411            METRIC_META_REGION_MIGRATION_DATANODES
412                .with_label_values(&["src", &task.from_peer.id.to_string()])
413                .inc();
414            METRIC_META_REGION_MIGRATION_DATANODES
415                .with_label_values(&["desc", &task.to_peer.id.to_string()])
416                .inc();
417
418            if let Err(e) = watcher::wait(watcher).await {
419                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
420                METRIC_META_REGION_MIGRATION_FAIL.inc();
421                return;
422            }
423
424            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
425        });
426
427        Ok(Some(procedure_id))
428    }
429}
430
431#[cfg(test)]
432mod test {
433    use std::assert_matches::assert_matches;
434
435    use common_meta::key::table_route::LogicalTableRouteValue;
436    use common_meta::key::test_utils::new_test_table_info;
437    use common_meta::rpc::router::Region;
438
439    use super::*;
440    use crate::procedure::region_migration::test_util::TestingEnv;
441
442    #[tokio::test]
443    async fn test_insert_running_procedure() {
444        let env = TestingEnv::new();
445        let context_factory = env.context_factory();
446        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
447        let region_id = RegionId::new(1024, 1);
448        let task = RegionMigrationProcedureTask {
449            region_id,
450            from_peer: Peer::empty(2),
451            to_peer: Peer::empty(1),
452            timeout: Duration::from_millis(1000),
453            trigger_reason: RegionMigrationTriggerReason::Manual,
454        };
455        // Inserts one
456        manager
457            .tracker
458            .running_procedures
459            .write()
460            .unwrap()
461            .insert(region_id, task.clone());
462
463        let err = manager.submit_procedure(task).await.unwrap_err();
464        assert_matches!(err, error::Error::MigrationRunning { .. });
465    }
466
467    #[tokio::test]
468    async fn test_submit_procedure_invalid_task() {
469        let env = TestingEnv::new();
470        let context_factory = env.context_factory();
471        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
472        let region_id = RegionId::new(1024, 1);
473        let task = RegionMigrationProcedureTask {
474            region_id,
475            from_peer: Peer::empty(1),
476            to_peer: Peer::empty(1),
477            timeout: Duration::from_millis(1000),
478            trigger_reason: RegionMigrationTriggerReason::Manual,
479        };
480
481        let err = manager.submit_procedure(task).await.unwrap_err();
482        assert_matches!(err, error::Error::InvalidArguments { .. });
483    }
484
485    #[tokio::test]
486    async fn test_submit_procedure_table_not_found() {
487        let env = TestingEnv::new();
488        let context_factory = env.context_factory();
489        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
490        let region_id = RegionId::new(1024, 1);
491        let task = RegionMigrationProcedureTask {
492            region_id,
493            from_peer: Peer::empty(1),
494            to_peer: Peer::empty(2),
495            timeout: Duration::from_millis(1000),
496            trigger_reason: RegionMigrationTriggerReason::Manual,
497        };
498
499        let err = manager.submit_procedure(task).await.unwrap_err();
500        assert_matches!(err, error::Error::TableRouteNotFound { .. });
501    }
502
503    #[tokio::test]
504    async fn test_submit_procedure_region_route_not_found() {
505        let env = TestingEnv::new();
506        let context_factory = env.context_factory();
507        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
508        let region_id = RegionId::new(1024, 1);
509        let task = RegionMigrationProcedureTask {
510            region_id,
511            from_peer: Peer::empty(1),
512            to_peer: Peer::empty(2),
513            timeout: Duration::from_millis(1000),
514            trigger_reason: RegionMigrationTriggerReason::Manual,
515        };
516
517        let table_info = new_test_table_info(1024, vec![1]).into();
518        let region_routes = vec![RegionRoute {
519            region: Region::new_test(RegionId::new(1024, 2)),
520            leader_peer: Some(Peer::empty(3)),
521            ..Default::default()
522        }];
523
524        env.create_physical_table_metadata(table_info, region_routes)
525            .await;
526
527        let err = manager.submit_procedure(task).await.unwrap_err();
528        assert_matches!(err, error::Error::RegionRouteNotFound { .. });
529    }
530
531    #[tokio::test]
532    async fn test_submit_procedure_incorrect_from_peer() {
533        let env = TestingEnv::new();
534        let context_factory = env.context_factory();
535        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
536        let region_id = RegionId::new(1024, 1);
537        let task = RegionMigrationProcedureTask {
538            region_id,
539            from_peer: Peer::empty(1),
540            to_peer: Peer::empty(2),
541            timeout: Duration::from_millis(1000),
542            trigger_reason: RegionMigrationTriggerReason::Manual,
543        };
544
545        let table_info = new_test_table_info(1024, vec![1]).into();
546        let region_routes = vec![RegionRoute {
547            region: Region::new_test(RegionId::new(1024, 1)),
548            leader_peer: Some(Peer::empty(3)),
549            ..Default::default()
550        }];
551
552        env.create_physical_table_metadata(table_info, region_routes)
553            .await;
554
555        let err = manager.submit_procedure(task).await.unwrap_err();
556        assert_matches!(err, error::Error::LeaderPeerChanged { .. });
557        assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
558    }
559
560    #[tokio::test]
561    async fn test_submit_procedure_region_follower_on_to_peer() {
562        let env = TestingEnv::new();
563        let context_factory = env.context_factory();
564        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
565        let region_id = RegionId::new(1024, 1);
566        let task = RegionMigrationProcedureTask {
567            region_id,
568            from_peer: Peer::empty(3),
569            to_peer: Peer::empty(2),
570            timeout: Duration::from_millis(1000),
571            trigger_reason: RegionMigrationTriggerReason::Manual,
572        };
573
574        let table_info = new_test_table_info(1024, vec![1]).into();
575        let region_routes = vec![RegionRoute {
576            region: Region::new_test(region_id),
577            leader_peer: Some(Peer::empty(3)),
578            follower_peers: vec![Peer::empty(2)],
579            ..Default::default()
580        }];
581
582        env.create_physical_table_metadata(table_info, region_routes)
583            .await;
584
585        let err = manager.submit_procedure(task).await.unwrap_err();
586        assert_matches!(err, error::Error::InvalidArguments { .. });
587        assert_eq!(
588            err.to_string(),
589            "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
590        );
591    }
592
593    #[tokio::test]
594    async fn test_submit_procedure_has_migrated() {
595        common_telemetry::init_default_ut_logging();
596        let env = TestingEnv::new();
597        let context_factory = env.context_factory();
598        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
599        let region_id = RegionId::new(1024, 1);
600        let task = RegionMigrationProcedureTask {
601            region_id,
602            from_peer: Peer::empty(1),
603            to_peer: Peer::empty(2),
604            timeout: Duration::from_millis(1000),
605            trigger_reason: RegionMigrationTriggerReason::Manual,
606        };
607
608        let table_info = new_test_table_info(1024, vec![1]).into();
609        let region_routes = vec![RegionRoute {
610            region: Region::new_test(RegionId::new(1024, 1)),
611            leader_peer: Some(Peer::empty(2)),
612            ..Default::default()
613        }];
614
615        env.create_physical_table_metadata(table_info, region_routes)
616            .await;
617
618        let err = manager.submit_procedure(task).await.unwrap_err();
619        assert_matches!(err, error::Error::RegionMigrated { .. });
620    }
621
622    #[tokio::test]
623    async fn test_verify_table_route_error() {
624        let env = TestingEnv::new();
625        let context_factory = env.context_factory();
626        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
627        let region_id = RegionId::new(1024, 1);
628        let task = RegionMigrationProcedureTask {
629            region_id,
630            from_peer: Peer::empty(1),
631            to_peer: Peer::empty(2),
632            timeout: Duration::from_millis(1000),
633            trigger_reason: RegionMigrationTriggerReason::Manual,
634        };
635
636        let err = manager
637            .verify_table_route(
638                &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
639                &task,
640            )
641            .unwrap_err();
642
643        assert_matches!(err, error::Error::Unexpected { .. });
644    }
645}