metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use snafu::{OptionExt, ResultExt};
24use store_api::region_engine::{BatchResponses, RegionEngine};
25use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::RegionId;
27
28use crate::engine::create::region_options_for_metadata_region;
29use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
30use crate::engine::MetricEngineInner;
31use crate::error::{
32    BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
33    PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39    pub async fn handle_batch_open_requests(
40        &self,
41        parallelism: usize,
42        requests: Vec<(RegionId, RegionOpenRequest)>,
43    ) -> Result<BatchResponses> {
44        // We need to open metadata region and data region for each request.
45        let mut all_requests = Vec::with_capacity(requests.len() * 2);
46        let mut physical_region_ids = HashMap::with_capacity(requests.len());
47
48        for (region_id, request) in requests {
49            if !request.is_physical_table() {
50                continue;
51            }
52            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
53            let metadata_region_id = utils::to_metadata_region_id(region_id);
54            let data_region_id = utils::to_data_region_id(region_id);
55            let (open_metadata_region_request, open_data_region_request) =
56                self.transform_open_physical_region_request(request);
57            all_requests.push((metadata_region_id, open_metadata_region_request));
58            all_requests.push((data_region_id, open_data_region_request));
59            physical_region_ids.insert(region_id, physical_region_options);
60        }
61
62        let mut results = self
63            .mito
64            .handle_batch_open_requests(parallelism, all_requests)
65            .await
66            .context(BatchOpenMitoRegionSnafu {})?
67            .into_iter()
68            .collect::<HashMap<_, _>>();
69
70        let mut responses = Vec::with_capacity(physical_region_ids.len());
71        for (physical_region_id, physical_region_options) in physical_region_ids {
72            let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
73            let data_region_id = utils::to_data_region_id(physical_region_id);
74            let metadata_region_result = results.remove(&metadata_region_id);
75            let data_region_result = results.remove(&data_region_id);
76            // Pass the optional `metadata_region_result` and `data_region_result` to
77            // `open_physical_region_with_results`. This function handles errors for each
78            // open physical region request, allowing the process to continue with the
79            // remaining regions even if some requests fail.
80            let response = self
81                .open_physical_region_with_results(
82                    metadata_region_result,
83                    data_region_result,
84                    physical_region_id,
85                    physical_region_options,
86                )
87                .await
88                .map_err(BoxedError::new);
89            responses.push((physical_region_id, response));
90        }
91
92        Ok(responses)
93    }
94
95    // If the metadata region is opened with a stale manifest,
96    // the metric engine may fail to recover logical tables from the metadata region,
97    // as the manifest could reference files that have already been deleted
98    // due to compaction operations performed by the region leader.
99    async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
100        info!(
101            "Closing metadata region {} and data region {} on metadata recovery failure",
102            utils::to_metadata_region_id(physical_region_id),
103            utils::to_data_region_id(physical_region_id)
104        );
105        if let Err(err) = self.close_physical_region(physical_region_id).await {
106            error!(err; "Failed to close physical region {}", physical_region_id);
107        }
108    }
109
110    async fn open_physical_region_with_results(
111        &self,
112        metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
113        data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
114        physical_region_id: RegionId,
115        physical_region_options: PhysicalRegionOptions,
116    ) -> Result<RegionResponse> {
117        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
118        let data_region_id = utils::to_data_region_id(physical_region_id);
119        let _ = metadata_region_result
120            .context(NoOpenRegionResultSnafu {
121                region_id: metadata_region_id,
122            })?
123            .context(OpenMitoRegionSnafu {
124                region_type: "metadata",
125            })?;
126
127        let data_region_response = data_region_result
128            .context(NoOpenRegionResultSnafu {
129                region_id: data_region_id,
130            })?
131            .context(OpenMitoRegionSnafu {
132                region_type: "data",
133            })?;
134
135        if let Err(err) = self
136            .recover_states(physical_region_id, physical_region_options)
137            .await
138        {
139            self.close_physical_region_on_recovery_failure(physical_region_id)
140                .await;
141            return Err(err);
142        }
143        Ok(data_region_response)
144    }
145
146    /// Open a metric region.
147    ///
148    /// Only open requests to a physical region matter. Those to logical regions are
149    /// actually an empty operation -- it only check if the request is valid. Since
150    /// logical regions are multiplexed over physical regions, they are always "open".
151    ///
152    /// If trying to open a logical region whose physical region is not open, metric
153    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
154    /// error.
155    pub async fn open_region(
156        &self,
157        region_id: RegionId,
158        request: RegionOpenRequest,
159    ) -> Result<AffectedRows> {
160        if request.is_physical_table() {
161            if self
162                .state
163                .read()
164                .unwrap()
165                .physical_region_states()
166                .get(&region_id)
167                .is_some()
168            {
169                warn!(
170                    "The physical region {} is already open, ignore the open request",
171                    region_id
172                );
173                return Ok(0);
174            }
175            // open physical region and recover states
176            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
177            self.open_physical_region(region_id, request).await?;
178            if let Err(err) = self
179                .recover_states(region_id, physical_region_options)
180                .await
181            {
182                self.close_physical_region_on_recovery_failure(region_id)
183                    .await;
184                return Err(err);
185            }
186
187            Ok(0)
188        } else {
189            // Don't check if the logical region exist. Because a logical region cannot be opened
190            // individually, it is always "open" if its physical region is open. But the engine
191            // can't tell if the logical region is not exist or the physical region is not opened
192            // yet. Thus simply return `Ok` here to ignore all those errors.
193            Ok(0)
194        }
195    }
196
197    /// Transform the open request to open metadata region and data region.
198    ///
199    /// Returns:
200    /// - The open request for metadata region.
201    /// - The open request for data region.
202    fn transform_open_physical_region_request(
203        &self,
204        request: RegionOpenRequest,
205    ) -> (RegionOpenRequest, RegionOpenRequest) {
206        let metadata_region_options = region_options_for_metadata_region(&request.options);
207        let checkpoint = request.checkpoint;
208
209        let open_metadata_region_request = RegionOpenRequest {
210            table_dir: request.table_dir.clone(),
211            path_type: PathType::Metadata,
212            options: metadata_region_options,
213            engine: MITO_ENGINE_NAME.to_string(),
214            skip_wal_replay: request.skip_wal_replay,
215            checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
216                entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
217                metadata_entry_id: None,
218            }),
219        };
220
221        let mut data_region_options = request.options;
222        set_data_region_options(
223            &mut data_region_options,
224            self.config.experimental_sparse_primary_key_encoding,
225        );
226        let open_data_region_request = RegionOpenRequest {
227            table_dir: request.table_dir.clone(),
228            path_type: PathType::Data,
229            options: data_region_options,
230            engine: MITO_ENGINE_NAME.to_string(),
231            skip_wal_replay: request.skip_wal_replay,
232            checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
233                entry_id: checkpoint.entry_id,
234                metadata_entry_id: None,
235            }),
236        };
237
238        (open_metadata_region_request, open_data_region_request)
239    }
240
241    /// Invokes mito engine to open physical regions (data and metadata).
242    async fn open_physical_region(
243        &self,
244        region_id: RegionId,
245        request: RegionOpenRequest,
246    ) -> Result<AffectedRows> {
247        let metadata_region_id = utils::to_metadata_region_id(region_id);
248        let data_region_id = utils::to_data_region_id(region_id);
249        let (open_metadata_region_request, open_data_region_request) =
250            self.transform_open_physical_region_request(request);
251        let _ = self
252            .mito
253            .handle_batch_open_requests(
254                2,
255                vec![
256                    (metadata_region_id, open_metadata_region_request),
257                    (data_region_id, open_data_region_request),
258                ],
259            )
260            .await
261            .context(BatchOpenMitoRegionSnafu {})?;
262
263        info!("Opened physical metric region {region_id}");
264        PHYSICAL_REGION_COUNT.inc();
265
266        Ok(0)
267    }
268
269    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
270    /// physical region (idnefied by the given region id).
271    ///
272    /// Includes:
273    /// - Record physical region's column names
274    /// - Record the mapping between logical region id and physical region id
275    ///
276    /// Returns new opened logical region ids.
277    pub(crate) async fn recover_states(
278        &self,
279        physical_region_id: RegionId,
280        physical_region_options: PhysicalRegionOptions,
281    ) -> Result<Vec<RegionId>> {
282        // load logical regions and physical column names
283        let logical_regions = self
284            .metadata_region
285            .logical_regions(physical_region_id)
286            .await?;
287        let physical_columns = self
288            .data_region
289            .physical_columns(physical_region_id)
290            .await?;
291        let primary_key_encoding = self
292            .mito
293            .get_primary_key_encoding(physical_region_id)
294            .context(PhysicalRegionNotFoundSnafu {
295                region_id: physical_region_id,
296            })?;
297
298        {
299            let mut state = self.state.write().unwrap();
300            // recover physical column names
301            // Safety: The physical columns are loaded from the data region, which always
302            // has a time index.
303            let time_index_unit = physical_columns
304                .iter()
305                .find_map(|col| {
306                    if col.semantic_type == SemanticType::Timestamp {
307                        col.column_schema
308                            .data_type
309                            .as_timestamp()
310                            .map(|data_type| data_type.unit())
311                    } else {
312                        None
313                    }
314                })
315                .unwrap();
316            let physical_columns = physical_columns
317                .into_iter()
318                .map(|col| (col.column_schema.name, col.column_id))
319                .collect();
320            state.add_physical_region(
321                physical_region_id,
322                physical_columns,
323                primary_key_encoding,
324                physical_region_options,
325                time_index_unit,
326            );
327            // recover logical regions
328            for logical_region_id in &logical_regions {
329                state.add_logical_region(physical_region_id, *logical_region_id);
330            }
331        }
332
333        let mut opened_logical_region_ids = Vec::new();
334        // The `recover_states` may be called multiple times, we only count the logical regions
335        // that are opened for the first time.
336        for logical_region_id in logical_regions {
337            if self
338                .metadata_region
339                .open_logical_region(logical_region_id)
340                .await
341            {
342                opened_logical_region_ids.push(logical_region_id);
343            }
344        }
345
346        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
347
348        Ok(opened_logical_region_ids)
349    }
350}
351
352// Unit tests in engine.rs