metric_engine/engine/
catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_error::ext::BoxedError;
18use snafu::{OptionExt, ResultExt};
19use store_api::region_engine::{BatchResponses, RegionEngine};
20use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
21use store_api::storage::RegionId;
22
23use crate::engine::MetricEngineInner;
24use crate::error::{BatchCatchupMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
25use crate::utils;
26
27impl MetricEngineInner {
28    pub async fn handle_batch_catchup_requests(
29        &self,
30        parallelism: usize,
31        requests: Vec<(RegionId, RegionCatchupRequest)>,
32    ) -> Result<BatchResponses> {
33        let mut all_requests = Vec::with_capacity(requests.len() * 2);
34        let mut physical_region_options_list = Vec::with_capacity(requests.len());
35
36        for (region_id, req) in requests {
37            let metadata_region_id = utils::to_metadata_region_id(region_id);
38            let data_region_id = utils::to_data_region_id(region_id);
39
40            let physical_region_options = *self
41                .state
42                .read()
43                .unwrap()
44                .physical_region_states()
45                .get(&data_region_id)
46                .context(PhysicalRegionNotFoundSnafu {
47                    region_id: data_region_id,
48                })?
49                .options();
50            physical_region_options_list.push((data_region_id, physical_region_options));
51            all_requests.push((
52                metadata_region_id,
53                RegionCatchupRequest {
54                    set_writable: req.set_writable,
55                    entry_id: req.metadata_entry_id,
56                    metadata_entry_id: None,
57                    location_id: req.location_id,
58                    checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
59                        entry_id: c.metadata_entry_id.unwrap_or_default(),
60                        metadata_entry_id: None,
61                    }),
62                },
63            ));
64            all_requests.push((
65                data_region_id,
66                RegionCatchupRequest {
67                    set_writable: req.set_writable,
68                    entry_id: req.entry_id,
69                    metadata_entry_id: None,
70                    location_id: req.location_id,
71                    checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
72                        entry_id: c.entry_id,
73                        metadata_entry_id: None,
74                    }),
75                },
76            ));
77        }
78
79        let mut results = self
80            .mito
81            .handle_batch_catchup_requests(parallelism, all_requests)
82            .await
83            .context(BatchCatchupMitoRegionSnafu {})?
84            .into_iter()
85            .collect::<HashMap<_, _>>();
86
87        let mut responses = Vec::with_capacity(physical_region_options_list.len());
88        for (physical_region_id, physical_region_options) in physical_region_options_list {
89            let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
90            let data_region_id = utils::to_data_region_id(physical_region_id);
91            let metadata_region_result = results.remove(&metadata_region_id);
92            let data_region_result = results.remove(&data_region_id);
93
94            // Pass the optional `metadata_region_result` and `data_region_result` to
95            // `recover_physical_region_with_results`. This function handles errors for each
96            // catchup physical region request, allowing the process to continue with the
97            // remaining regions even if some requests fail.
98            let response = self
99                .recover_physical_region_with_results(
100                    metadata_region_result,
101                    data_region_result,
102                    physical_region_id,
103                    physical_region_options,
104                    // Note: We intentionally don’t close the region if recovery fails.
105                    // Closing it here might confuse the region server since it links RegionIds to Engines.
106                    // If recovery didn’t succeed, the region should stay open.
107                    false,
108                )
109                .await
110                .map_err(BoxedError::new);
111            responses.push((physical_region_id, response));
112        }
113
114        Ok(responses)
115    }
116}