metric_engine/engine/
catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::debug;
16use snafu::{OptionExt, ResultExt};
17use store_api::region_engine::RegionEngine;
18use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
19use store_api::storage::RegionId;
20
21use crate::engine::MetricEngineInner;
22use crate::error::{
23    MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
24};
25use crate::utils;
26
27impl MetricEngineInner {
28    pub async fn catchup_region(
29        &self,
30        region_id: RegionId,
31        req: RegionCatchupRequest,
32    ) -> Result<AffectedRows> {
33        if !self.is_physical_region(region_id) {
34            return UnsupportedRegionRequestSnafu {
35                request: RegionRequest::Catchup(req),
36            }
37            .fail();
38        }
39        let data_region_id = utils::to_data_region_id(region_id);
40        let physical_region_options = *self
41            .state
42            .read()
43            .unwrap()
44            .physical_region_states()
45            .get(&data_region_id)
46            .context(PhysicalRegionNotFoundSnafu {
47                region_id: data_region_id,
48            })?
49            .options();
50
51        let metadata_region_id = utils::to_metadata_region_id(region_id);
52        // TODO(weny): improve the catchup, we can read the wal entries only once.
53        debug!("Catchup metadata region {metadata_region_id}");
54        self.mito
55            .handle_request(
56                metadata_region_id,
57                RegionRequest::Catchup(RegionCatchupRequest {
58                    set_writable: req.set_writable,
59                    entry_id: req.metadata_entry_id,
60                    metadata_entry_id: None,
61                    location_id: req.location_id,
62                }),
63            )
64            .await
65            .context(MitoCatchupOperationSnafu)?;
66
67        debug!("Catchup data region {data_region_id}");
68        self.mito
69            .handle_request(
70                data_region_id,
71                RegionRequest::Catchup(RegionCatchupRequest {
72                    set_writable: req.set_writable,
73                    entry_id: req.entry_id,
74                    metadata_entry_id: None,
75                    location_id: req.location_id,
76                }),
77            )
78            .await
79            .context(MitoCatchupOperationSnafu)
80            .map(|response| response.affected_rows)?;
81
82        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
83            PhysicalRegionNotFoundSnafu {
84                region_id: data_region_id,
85            },
86        )?;
87        self.recover_states(region_id, primary_key_encoding, physical_region_options)
88            .await?;
89        Ok(0)
90    }
91}