metric_engine/engine/
catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::debug;
16use snafu::{OptionExt, ResultExt};
17use store_api::region_engine::RegionEngine;
18use store_api::region_request::{
19    AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
20};
21use store_api::storage::RegionId;
22
23use crate::engine::MetricEngineInner;
24use crate::error::{
25    MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
26};
27use crate::utils;
28
29impl MetricEngineInner {
30    pub async fn catchup_region(
31        &self,
32        region_id: RegionId,
33        req: RegionCatchupRequest,
34    ) -> Result<AffectedRows> {
35        if !self.is_physical_region(region_id) {
36            return UnsupportedRegionRequestSnafu {
37                request: RegionRequest::Catchup(req),
38            }
39            .fail();
40        }
41        let data_region_id = utils::to_data_region_id(region_id);
42        let physical_region_options = *self
43            .state
44            .read()
45            .unwrap()
46            .physical_region_states()
47            .get(&data_region_id)
48            .context(PhysicalRegionNotFoundSnafu {
49                region_id: data_region_id,
50            })?
51            .options();
52
53        let metadata_region_id = utils::to_metadata_region_id(region_id);
54        // TODO(weny): improve the catchup, we can read the wal entries only once.
55        debug!("Catchup metadata region {metadata_region_id}");
56        self.mito
57            .handle_request(
58                metadata_region_id,
59                RegionRequest::Catchup(RegionCatchupRequest {
60                    set_writable: req.set_writable,
61                    entry_id: req.metadata_entry_id,
62                    metadata_entry_id: None,
63                    location_id: req.location_id,
64                    checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
65                        entry_id: c.metadata_entry_id.unwrap_or_default(),
66                        metadata_entry_id: None,
67                    }),
68                }),
69            )
70            .await
71            .context(MitoCatchupOperationSnafu)?;
72
73        debug!("Catchup data region {data_region_id}");
74        self.mito
75            .handle_request(
76                data_region_id,
77                RegionRequest::Catchup(RegionCatchupRequest {
78                    set_writable: req.set_writable,
79                    entry_id: req.entry_id,
80                    metadata_entry_id: None,
81                    location_id: req.location_id,
82                    checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
83                        entry_id: c.entry_id,
84                        metadata_entry_id: None,
85                    }),
86                }),
87            )
88            .await
89            .context(MitoCatchupOperationSnafu)
90            .map(|response| response.affected_rows)?;
91
92        self.recover_states(region_id, physical_region_options)
93            .await?;
94        Ok(0)
95    }
96}