metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use object_store::util::join_dir;
24use snafu::{OptionExt, ResultExt};
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34    BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
35    PhysicalRegionNotFoundSnafu, Result,
36};
37use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
38use crate::utils;
39
40impl MetricEngineInner {
41    pub async fn handle_batch_open_requests(
42        &self,
43        parallelism: usize,
44        requests: Vec<(RegionId, RegionOpenRequest)>,
45    ) -> Result<BatchResponses> {
46        // We need to open metadata region and data region for each request.
47        let mut all_requests = Vec::with_capacity(requests.len() * 2);
48        let mut physical_region_ids = HashMap::with_capacity(requests.len());
49
50        for (region_id, request) in requests {
51            if !request.is_physical_table() {
52                continue;
53            }
54            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55            let metadata_region_id = utils::to_metadata_region_id(region_id);
56            let data_region_id = utils::to_data_region_id(region_id);
57            let (open_metadata_region_request, open_data_region_request) =
58                self.transform_open_physical_region_request(request);
59            all_requests.push((metadata_region_id, open_metadata_region_request));
60            all_requests.push((data_region_id, open_data_region_request));
61            physical_region_ids.insert(region_id, physical_region_options);
62        }
63
64        let mut results = self
65            .mito
66            .handle_batch_open_requests(parallelism, all_requests)
67            .await
68            .context(BatchOpenMitoRegionSnafu {})?
69            .into_iter()
70            .collect::<HashMap<_, _>>();
71
72        let mut responses = Vec::with_capacity(physical_region_ids.len());
73        for (physical_region_id, physical_region_options) in physical_region_ids {
74            let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
75            let data_region_id = utils::to_data_region_id(physical_region_id);
76            let metadata_region_result = results.remove(&metadata_region_id);
77            let data_region_result = results.remove(&data_region_id);
78            // Pass the optional `metadata_region_result` and `data_region_result` to
79            // `open_physical_region_with_results`. This function handles errors for each
80            // open physical region request, allowing the process to continue with the
81            // remaining regions even if some requests fail.
82            let response = self
83                .open_physical_region_with_results(
84                    metadata_region_result,
85                    data_region_result,
86                    physical_region_id,
87                    physical_region_options,
88                )
89                .await
90                .map_err(BoxedError::new);
91            responses.push((physical_region_id, response));
92        }
93
94        Ok(responses)
95    }
96
97    // If the metadata region is opened with a stale manifest,
98    // the metric engine may fail to recover logical tables from the metadata region,
99    // as the manifest could reference files that have already been deleted
100    // due to compaction operations performed by the region leader.
101    async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
102        info!(
103            "Closing metadata region {} and data region {} on metadata recovery failure",
104            utils::to_metadata_region_id(physical_region_id),
105            utils::to_data_region_id(physical_region_id)
106        );
107        if let Err(err) = self.close_physical_region(physical_region_id).await {
108            error!(err; "Failed to close physical region {}", physical_region_id);
109        }
110    }
111
112    async fn open_physical_region_with_results(
113        &self,
114        metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
115        data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
116        physical_region_id: RegionId,
117        physical_region_options: PhysicalRegionOptions,
118    ) -> Result<RegionResponse> {
119        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
120        let data_region_id = utils::to_data_region_id(physical_region_id);
121        let _ = metadata_region_result
122            .context(NoOpenRegionResultSnafu {
123                region_id: metadata_region_id,
124            })?
125            .context(OpenMitoRegionSnafu {
126                region_type: "metadata",
127            })?;
128
129        let data_region_response = data_region_result
130            .context(NoOpenRegionResultSnafu {
131                region_id: data_region_id,
132            })?
133            .context(OpenMitoRegionSnafu {
134                region_type: "data",
135            })?;
136
137        if let Err(err) = self
138            .recover_states(physical_region_id, physical_region_options)
139            .await
140        {
141            self.close_physical_region_on_recovery_failure(physical_region_id)
142                .await;
143            return Err(err);
144        }
145        Ok(data_region_response)
146    }
147
148    /// Open a metric region.
149    ///
150    /// Only open requests to a physical region matter. Those to logical regions are
151    /// actually an empty operation -- it only check if the request is valid. Since
152    /// logical regions are multiplexed over physical regions, they are always "open".
153    ///
154    /// If trying to open a logical region whose physical region is not open, metric
155    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
156    /// error.
157    pub async fn open_region(
158        &self,
159        region_id: RegionId,
160        request: RegionOpenRequest,
161    ) -> Result<AffectedRows> {
162        if request.is_physical_table() {
163            if self
164                .state
165                .read()
166                .unwrap()
167                .physical_region_states()
168                .get(&region_id)
169                .is_some()
170            {
171                warn!(
172                    "The physical region {} is already open, ignore the open request",
173                    region_id
174                );
175                return Ok(0);
176            }
177            // open physical region and recover states
178            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
179            self.open_physical_region(region_id, request).await?;
180            if let Err(err) = self
181                .recover_states(region_id, physical_region_options)
182                .await
183            {
184                self.close_physical_region_on_recovery_failure(region_id)
185                    .await;
186                return Err(err);
187            }
188
189            Ok(0)
190        } else {
191            // Don't check if the logical region exist. Because a logical region cannot be opened
192            // individually, it is always "open" if its physical region is open. But the engine
193            // can't tell if the logical region is not exist or the physical region is not opened
194            // yet. Thus simply return `Ok` here to ignore all those errors.
195            Ok(0)
196        }
197    }
198
199    /// Transform the open request to open metadata region and data region.
200    ///
201    /// Returns:
202    /// - The open request for metadata region.
203    /// - The open request for data region.
204    fn transform_open_physical_region_request(
205        &self,
206        request: RegionOpenRequest,
207    ) -> (RegionOpenRequest, RegionOpenRequest) {
208        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
209        let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
210
211        let metadata_region_options = region_options_for_metadata_region(&request.options);
212        let open_metadata_region_request = RegionOpenRequest {
213            region_dir: metadata_region_dir,
214            options: metadata_region_options,
215            engine: MITO_ENGINE_NAME.to_string(),
216            skip_wal_replay: request.skip_wal_replay,
217        };
218
219        let mut data_region_options = request.options;
220        set_data_region_options(
221            &mut data_region_options,
222            self.config.experimental_sparse_primary_key_encoding,
223        );
224        let open_data_region_request = RegionOpenRequest {
225            region_dir: data_region_dir,
226            options: data_region_options,
227            engine: MITO_ENGINE_NAME.to_string(),
228            skip_wal_replay: request.skip_wal_replay,
229        };
230
231        (open_metadata_region_request, open_data_region_request)
232    }
233
234    /// Invokes mito engine to open physical regions (data and metadata).
235    async fn open_physical_region(
236        &self,
237        region_id: RegionId,
238        request: RegionOpenRequest,
239    ) -> Result<AffectedRows> {
240        let metadata_region_id = utils::to_metadata_region_id(region_id);
241        let data_region_id = utils::to_data_region_id(region_id);
242        let (open_metadata_region_request, open_data_region_request) =
243            self.transform_open_physical_region_request(request);
244
245        self.mito
246            .handle_request(
247                metadata_region_id,
248                RegionRequest::Open(open_metadata_region_request),
249            )
250            .await
251            .with_context(|_| OpenMitoRegionSnafu {
252                region_type: "metadata",
253            })?;
254        self.mito
255            .handle_request(
256                data_region_id,
257                RegionRequest::Open(open_data_region_request),
258            )
259            .await
260            .with_context(|_| OpenMitoRegionSnafu {
261                region_type: "data",
262            })?;
263
264        info!("Opened physical metric region {region_id}");
265        PHYSICAL_REGION_COUNT.inc();
266
267        Ok(0)
268    }
269
270    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
271    /// physical region (idnefied by the given region id).
272    ///
273    /// Includes:
274    /// - Record physical region's column names
275    /// - Record the mapping between logical region id and physical region id
276    ///
277    /// Returns new opened logical region ids.
278    pub(crate) async fn recover_states(
279        &self,
280        physical_region_id: RegionId,
281        physical_region_options: PhysicalRegionOptions,
282    ) -> Result<Vec<RegionId>> {
283        // load logical regions and physical column names
284        let logical_regions = self
285            .metadata_region
286            .logical_regions(physical_region_id)
287            .await?;
288        let physical_columns = self
289            .data_region
290            .physical_columns(physical_region_id)
291            .await?;
292        let primary_key_encoding = self
293            .mito
294            .get_primary_key_encoding(physical_region_id)
295            .context(PhysicalRegionNotFoundSnafu {
296                region_id: physical_region_id,
297            })?;
298
299        {
300            let mut state = self.state.write().unwrap();
301            // recover physical column names
302            // Safety: The physical columns are loaded from the data region, which always
303            // has a time index.
304            let time_index_unit = physical_columns
305                .iter()
306                .find_map(|col| {
307                    if col.semantic_type == SemanticType::Timestamp {
308                        col.column_schema
309                            .data_type
310                            .as_timestamp()
311                            .map(|data_type| data_type.unit())
312                    } else {
313                        None
314                    }
315                })
316                .unwrap();
317            let physical_columns = physical_columns
318                .into_iter()
319                .map(|col| (col.column_schema.name, col.column_id))
320                .collect();
321            state.add_physical_region(
322                physical_region_id,
323                physical_columns,
324                primary_key_encoding,
325                physical_region_options,
326                time_index_unit,
327            );
328            // recover logical regions
329            for logical_region_id in &logical_regions {
330                state.add_logical_region(physical_region_id, *logical_region_id);
331            }
332        }
333
334        let mut opened_logical_region_ids = Vec::new();
335        // The `recover_states` may be called multiple times, we only count the logical regions
336        // that are opened for the first time.
337        for logical_region_id in logical_regions {
338            if self
339                .metadata_region
340                .open_logical_region(logical_region_id)
341                .await
342            {
343                opened_logical_region_ids.push(logical_region_id);
344            }
345        }
346
347        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
348
349        Ok(opened_logical_region_ids)
350    }
351}
352
353// Unit tests in engine.rs