meta_srv/procedure/region_migration/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use common_meta::key::table_info::TableInfoValue;
22use common_meta::key::table_route::TableRouteValue;
23use common_meta::peer::Peer;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
26use common_telemetry::{error, info};
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::storage::RegionId;
29use table::table_name::TableName;
30
31use crate::error::{self, Result};
32use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
33use crate::procedure::region_migration::{
34    DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
35};
36
37pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
38
39/// Manager of region migration procedure.
40pub struct RegionMigrationManager {
41    procedure_manager: ProcedureManagerRef,
42    context_factory: DefaultContextFactory,
43    tracker: RegionMigrationProcedureTracker,
44}
45
46#[derive(Default, Clone)]
47pub struct RegionMigrationProcedureTracker {
48    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
49}
50
51impl RegionMigrationProcedureTracker {
52    /// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating.
53    pub(crate) fn insert_running_procedure(
54        &self,
55        task: &RegionMigrationProcedureTask,
56    ) -> Option<RegionMigrationProcedureGuard> {
57        let mut procedures = self.running_procedures.write().unwrap();
58        match procedures.entry(task.region_id) {
59            Entry::Occupied(_) => None,
60            Entry::Vacant(v) => {
61                v.insert(task.clone());
62                Some(RegionMigrationProcedureGuard {
63                    region_id: task.region_id,
64                    running_procedures: self.running_procedures.clone(),
65                })
66            }
67        }
68    }
69
70    /// Returns true if it contains the specific region(`region_id`).
71    pub(crate) fn contains(&self, region_id: RegionId) -> bool {
72        self.running_procedures
73            .read()
74            .unwrap()
75            .contains_key(&region_id)
76    }
77}
78
79/// The guard of running [RegionMigrationProcedureTask].
80pub(crate) struct RegionMigrationProcedureGuard {
81    region_id: RegionId,
82    running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
83}
84
85impl Drop for RegionMigrationProcedureGuard {
86    fn drop(&mut self) {
87        let exists = self
88            .running_procedures
89            .read()
90            .unwrap()
91            .contains_key(&self.region_id);
92        if exists {
93            self.running_procedures
94                .write()
95                .unwrap()
96                .remove(&self.region_id);
97        }
98    }
99}
100
101#[derive(Debug, Clone)]
102pub struct RegionMigrationProcedureTask {
103    pub(crate) region_id: RegionId,
104    pub(crate) from_peer: Peer,
105    pub(crate) to_peer: Peer,
106    pub(crate) timeout: Duration,
107}
108
109impl RegionMigrationProcedureTask {
110    pub fn new(region_id: RegionId, from_peer: Peer, to_peer: Peer, timeout: Duration) -> Self {
111        Self {
112            region_id,
113            from_peer,
114            to_peer,
115            timeout,
116        }
117    }
118}
119
120impl Display for RegionMigrationProcedureTask {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        write!(
123            f,
124            "region: {}, from_peer: {}, to_peer: {}",
125            self.region_id, self.from_peer, self.to_peer
126        )
127    }
128}
129
130impl RegionMigrationManager {
131    /// Returns new [`RegionMigrationManager`]
132    pub(crate) fn new(
133        procedure_manager: ProcedureManagerRef,
134        context_factory: DefaultContextFactory,
135    ) -> Self {
136        Self {
137            procedure_manager,
138            context_factory,
139            tracker: RegionMigrationProcedureTracker::default(),
140        }
141    }
142
143    /// Returns the [`RegionMigrationProcedureTracker`].
144    pub fn tracker(&self) -> &RegionMigrationProcedureTracker {
145        &self.tracker
146    }
147
148    /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
149    pub(crate) fn try_start(&self) -> Result<()> {
150        let context_factory = self.context_factory.clone();
151        let tracker = self.tracker.clone();
152        self.procedure_manager
153            .register_loader(
154                RegionMigrationProcedure::TYPE_NAME,
155                Box::new(move |json| {
156                    let context_factory = context_factory.clone();
157                    let tracker = tracker.clone();
158                    RegionMigrationProcedure::from_json(json, context_factory, tracker)
159                        .map(|p| Box::new(p) as _)
160                }),
161            )
162            .context(error::RegisterProcedureLoaderSnafu {
163                type_name: RegionMigrationProcedure::TYPE_NAME,
164            })
165    }
166
167    fn insert_running_procedure(
168        &self,
169        task: &RegionMigrationProcedureTask,
170    ) -> Option<RegionMigrationProcedureGuard> {
171        self.tracker.insert_running_procedure(task)
172    }
173
174    fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
175        if task.to_peer.id == task.from_peer.id {
176            return error::InvalidArgumentsSnafu {
177                err_msg: "The `from_peer_id` can't equal `to_peer_id`",
178            }
179            .fail();
180        }
181
182        Ok(())
183    }
184
185    async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
186        let table_route = self
187            .context_factory
188            .table_metadata_manager
189            .table_route_manager()
190            .table_route_storage()
191            .get(region_id.table_id())
192            .await
193            .context(error::TableMetadataManagerSnafu)?
194            .context(error::TableRouteNotFoundSnafu {
195                table_id: region_id.table_id(),
196            })?;
197
198        Ok(table_route)
199    }
200
201    async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
202        let table_route = self
203            .context_factory
204            .table_metadata_manager
205            .table_info_manager()
206            .get(region_id.table_id())
207            .await
208            .context(error::TableMetadataManagerSnafu)?
209            .context(error::TableInfoNotFoundSnafu {
210                table_id: region_id.table_id(),
211            })?
212            .into_inner();
213
214        Ok(table_route)
215    }
216
217    /// Verifies the type of region migration table route.
218    fn verify_table_route(
219        &self,
220        table_route: &TableRouteValue,
221        task: &RegionMigrationProcedureTask,
222    ) -> Result<()> {
223        if !table_route.is_physical() {
224            return error::UnexpectedSnafu {
225                violated: format!(
226                    "Trying to execute region migration on the logical table, task {task}"
227                ),
228            }
229            .fail();
230        }
231
232        Ok(())
233    }
234
235    /// Returns true if the region has been migrated.
236    fn has_migrated(
237        &self,
238        region_route: &RegionRoute,
239        task: &RegionMigrationProcedureTask,
240    ) -> Result<bool> {
241        if region_route.is_leader_downgrading() {
242            return Ok(false);
243        }
244
245        let leader_peer = region_route
246            .leader_peer
247            .as_ref()
248            .context(error::UnexpectedSnafu {
249                violated: "Region route leader peer is not found",
250            })?;
251
252        Ok(leader_peer.id == task.to_peer.id)
253    }
254
255    /// Throws an error if `leader_peer` is not the `from_peer`.
256    fn verify_region_leader_peer(
257        &self,
258        region_route: &RegionRoute,
259        task: &RegionMigrationProcedureTask,
260    ) -> Result<()> {
261        let leader_peer = region_route
262            .leader_peer
263            .as_ref()
264            .context(error::UnexpectedSnafu {
265                violated: "Region route leader peer is not found",
266            })?;
267
268        ensure!(
269            leader_peer.id == task.from_peer.id,
270            error::LeaderPeerChangedSnafu {
271                msg: format!(
272                    "Region's leader peer({}) is not the `from_peer`({}), region: {}",
273                    leader_peer.id, task.from_peer.id, task.region_id
274                ),
275            }
276        );
277
278        Ok(())
279    }
280
281    /// Throws an error if `to_peer` is already has a region follower.
282    fn verify_region_follower_peers(
283        &self,
284        region_route: &RegionRoute,
285        task: &RegionMigrationProcedureTask,
286    ) -> Result<()> {
287        ensure!(
288            !region_route.follower_peers.contains(&task.to_peer),
289            error::InvalidArgumentsSnafu {
290                err_msg: format!(
291                    "The `to_peer`({}) is already has a region follower, region: {}",
292                    task.to_peer.id, task.region_id
293                ),
294            },
295        );
296
297        Ok(())
298    }
299
300    /// Submits a new region migration procedure.
301    pub async fn submit_procedure(
302        &self,
303        task: RegionMigrationProcedureTask,
304    ) -> Result<Option<ProcedureId>> {
305        let Some(guard) = self.insert_running_procedure(&task) else {
306            return error::MigrationRunningSnafu {
307                region_id: task.region_id,
308            }
309            .fail();
310        };
311
312        self.verify_task(&task)?;
313
314        let region_id = task.region_id;
315
316        let table_route = self.retrieve_table_route(region_id).await?;
317        self.verify_table_route(&table_route, &task)?;
318
319        // Safety: checked before.
320        let region_route = table_route
321            .region_route(region_id)
322            .context(error::UnexpectedLogicalRouteTableSnafu {
323                err_msg: format!("{table_route:?} is a non-physical TableRouteValue."),
324            })?
325            .context(error::RegionRouteNotFoundSnafu { region_id })?;
326
327        if self.has_migrated(&region_route, &task)? {
328            info!("Skipping region migration task: {task}");
329            return Ok(None);
330        }
331
332        self.verify_region_leader_peer(&region_route, &task)?;
333        self.verify_region_follower_peers(&region_route, &task)?;
334        let table_info = self.retrieve_table_info(region_id).await?;
335        let TableName {
336            catalog_name,
337            schema_name,
338            ..
339        } = table_info.table_name();
340        METRIC_META_REGION_MIGRATION_DATANODES
341            .with_label_values(&["src", &task.from_peer.id.to_string()])
342            .inc();
343        METRIC_META_REGION_MIGRATION_DATANODES
344            .with_label_values(&["desc", &task.to_peer.id.to_string()])
345            .inc();
346        let RegionMigrationProcedureTask {
347            region_id,
348            from_peer,
349            to_peer,
350            timeout,
351        } = task.clone();
352        let procedure = RegionMigrationProcedure::new(
353            PersistentContext {
354                catalog: catalog_name,
355                schema: schema_name,
356                region_id,
357                from_peer,
358                to_peer,
359                timeout,
360            },
361            self.context_factory.clone(),
362            Some(guard),
363        );
364        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
365        let procedure_id = procedure_with_id.id;
366        info!("Starting region migration procedure {procedure_id} for {task}");
367        let procedure_manager = self.procedure_manager.clone();
368        common_runtime::spawn_global(async move {
369            let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
370                Ok(watcher) => watcher,
371                Err(e) => {
372                    error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
373                    return;
374                }
375            };
376
377            if let Err(e) = watcher::wait(watcher).await {
378                error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
379                METRIC_META_REGION_MIGRATION_FAIL.inc();
380                return;
381            }
382
383            info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
384        });
385
386        Ok(Some(procedure_id))
387    }
388}
389
390#[cfg(test)]
391mod test {
392    use std::assert_matches::assert_matches;
393
394    use common_meta::key::table_route::LogicalTableRouteValue;
395    use common_meta::key::test_utils::new_test_table_info;
396    use common_meta::rpc::router::Region;
397
398    use super::*;
399    use crate::procedure::region_migration::test_util::TestingEnv;
400
401    #[tokio::test]
402    async fn test_insert_running_procedure() {
403        let env = TestingEnv::new();
404        let context_factory = env.context_factory();
405        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
406        let region_id = RegionId::new(1024, 1);
407        let task = RegionMigrationProcedureTask {
408            region_id,
409            from_peer: Peer::empty(2),
410            to_peer: Peer::empty(1),
411            timeout: Duration::from_millis(1000),
412        };
413        // Inserts one
414        manager
415            .tracker
416            .running_procedures
417            .write()
418            .unwrap()
419            .insert(region_id, task.clone());
420
421        let err = manager.submit_procedure(task).await.unwrap_err();
422        assert_matches!(err, error::Error::MigrationRunning { .. });
423    }
424
425    #[tokio::test]
426    async fn test_submit_procedure_invalid_task() {
427        let env = TestingEnv::new();
428        let context_factory = env.context_factory();
429        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
430        let region_id = RegionId::new(1024, 1);
431        let task = RegionMigrationProcedureTask {
432            region_id,
433            from_peer: Peer::empty(1),
434            to_peer: Peer::empty(1),
435            timeout: Duration::from_millis(1000),
436        };
437
438        let err = manager.submit_procedure(task).await.unwrap_err();
439        assert_matches!(err, error::Error::InvalidArguments { .. });
440    }
441
442    #[tokio::test]
443    async fn test_submit_procedure_table_not_found() {
444        let env = TestingEnv::new();
445        let context_factory = env.context_factory();
446        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
447        let region_id = RegionId::new(1024, 1);
448        let task = RegionMigrationProcedureTask {
449            region_id,
450            from_peer: Peer::empty(1),
451            to_peer: Peer::empty(2),
452            timeout: Duration::from_millis(1000),
453        };
454
455        let err = manager.submit_procedure(task).await.unwrap_err();
456        assert_matches!(err, error::Error::TableRouteNotFound { .. });
457    }
458
459    #[tokio::test]
460    async fn test_submit_procedure_region_route_not_found() {
461        let env = TestingEnv::new();
462        let context_factory = env.context_factory();
463        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
464        let region_id = RegionId::new(1024, 1);
465        let task = RegionMigrationProcedureTask {
466            region_id,
467            from_peer: Peer::empty(1),
468            to_peer: Peer::empty(2),
469            timeout: Duration::from_millis(1000),
470        };
471
472        let table_info = new_test_table_info(1024, vec![1]).into();
473        let region_routes = vec![RegionRoute {
474            region: Region::new_test(RegionId::new(1024, 2)),
475            leader_peer: Some(Peer::empty(3)),
476            ..Default::default()
477        }];
478
479        env.create_physical_table_metadata(table_info, region_routes)
480            .await;
481
482        let err = manager.submit_procedure(task).await.unwrap_err();
483        assert_matches!(err, error::Error::RegionRouteNotFound { .. });
484    }
485
486    #[tokio::test]
487    async fn test_submit_procedure_incorrect_from_peer() {
488        let env = TestingEnv::new();
489        let context_factory = env.context_factory();
490        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
491        let region_id = RegionId::new(1024, 1);
492        let task = RegionMigrationProcedureTask {
493            region_id,
494            from_peer: Peer::empty(1),
495            to_peer: Peer::empty(2),
496            timeout: Duration::from_millis(1000),
497        };
498
499        let table_info = new_test_table_info(1024, vec![1]).into();
500        let region_routes = vec![RegionRoute {
501            region: Region::new_test(RegionId::new(1024, 1)),
502            leader_peer: Some(Peer::empty(3)),
503            ..Default::default()
504        }];
505
506        env.create_physical_table_metadata(table_info, region_routes)
507            .await;
508
509        let err = manager.submit_procedure(task).await.unwrap_err();
510        assert_matches!(err, error::Error::LeaderPeerChanged { .. });
511        assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
512    }
513
514    #[tokio::test]
515    async fn test_submit_procedure_region_follower_on_to_peer() {
516        let env = TestingEnv::new();
517        let context_factory = env.context_factory();
518        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
519        let region_id = RegionId::new(1024, 1);
520        let task = RegionMigrationProcedureTask {
521            region_id,
522            from_peer: Peer::empty(3),
523            to_peer: Peer::empty(2),
524            timeout: Duration::from_millis(1000),
525        };
526
527        let table_info = new_test_table_info(1024, vec![1]).into();
528        let region_routes = vec![RegionRoute {
529            region: Region::new_test(region_id),
530            leader_peer: Some(Peer::empty(3)),
531            follower_peers: vec![Peer::empty(2)],
532            ..Default::default()
533        }];
534
535        env.create_physical_table_metadata(table_info, region_routes)
536            .await;
537
538        let err = manager.submit_procedure(task).await.unwrap_err();
539        assert_matches!(err, error::Error::InvalidArguments { .. });
540        assert_eq!(
541            err.to_string(),
542            "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
543        );
544    }
545
546    #[tokio::test]
547    async fn test_submit_procedure_has_migrated() {
548        common_telemetry::init_default_ut_logging();
549        let env = TestingEnv::new();
550        let context_factory = env.context_factory();
551        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
552        let region_id = RegionId::new(1024, 1);
553        let task = RegionMigrationProcedureTask {
554            region_id,
555            from_peer: Peer::empty(1),
556            to_peer: Peer::empty(2),
557            timeout: Duration::from_millis(1000),
558        };
559
560        let table_info = new_test_table_info(1024, vec![1]).into();
561        let region_routes = vec![RegionRoute {
562            region: Region::new_test(RegionId::new(1024, 1)),
563            leader_peer: Some(Peer::empty(2)),
564            ..Default::default()
565        }];
566
567        env.create_physical_table_metadata(table_info, region_routes)
568            .await;
569
570        manager.submit_procedure(task).await.unwrap();
571    }
572
573    #[tokio::test]
574    async fn test_verify_table_route_error() {
575        let env = TestingEnv::new();
576        let context_factory = env.context_factory();
577        let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
578        let region_id = RegionId::new(1024, 1);
579        let task = RegionMigrationProcedureTask {
580            region_id,
581            from_peer: Peer::empty(1),
582            to_peer: Peer::empty(2),
583            timeout: Duration::from_millis(1000),
584        };
585
586        let err = manager
587            .verify_table_route(
588                &TableRouteValue::Logical(LogicalTableRouteValue::new(0, vec![])),
589                &task,
590            )
591            .unwrap_err();
592
593        assert_matches!(err, error::Error::Unexpected { .. });
594    }
595}