metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use snafu::{OptionExt, ResultExt};
24use store_api::region_engine::{BatchResponses, RegionEngine};
25use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::RegionId;
27
28use crate::engine::MetricEngineInner;
29use crate::engine::create::region_options_for_metadata_region;
30use crate::engine::options::{PhysicalRegionOptions, set_data_region_options};
31use crate::error::{
32    BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
33    PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39    pub async fn handle_batch_open_requests(
40        &self,
41        parallelism: usize,
42        requests: Vec<(RegionId, RegionOpenRequest)>,
43    ) -> Result<BatchResponses> {
44        // We need to open metadata region and data region for each request.
45        let mut all_requests = Vec::with_capacity(requests.len() * 2);
46        let mut physical_region_ids = HashMap::with_capacity(requests.len());
47
48        for (region_id, request) in requests {
49            if !request.is_physical_table() {
50                warn!("Skipping non-physical table open request: {region_id}");
51                continue;
52            }
53            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
54            let metadata_region_id = utils::to_metadata_region_id(region_id);
55            let data_region_id = utils::to_data_region_id(region_id);
56            let (open_metadata_region_request, open_data_region_request) =
57                self.transform_open_physical_region_request(request);
58            all_requests.push((metadata_region_id, open_metadata_region_request));
59            all_requests.push((data_region_id, open_data_region_request));
60            physical_region_ids.insert(region_id, physical_region_options);
61        }
62
63        let mut results = self
64            .mito
65            .handle_batch_open_requests(parallelism, all_requests)
66            .await
67            .context(BatchOpenMitoRegionSnafu {})?
68            .into_iter()
69            .collect::<HashMap<_, _>>();
70
71        let mut responses = Vec::with_capacity(physical_region_ids.len());
72        for (physical_region_id, physical_region_options) in physical_region_ids {
73            let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
74            let data_region_id = utils::to_data_region_id(physical_region_id);
75            let metadata_region_result = results.remove(&metadata_region_id);
76            let data_region_result: Option<std::result::Result<RegionResponse, BoxedError>> =
77                results.remove(&data_region_id);
78            // Pass the optional `metadata_region_result` and `data_region_result` to
79            // `recover_physical_region_with_results`. This function handles errors for each
80            // open physical region request, allowing the process to continue with the
81            // remaining regions even if some requests fail.
82            let response = self
83                .recover_physical_region_with_results(
84                    metadata_region_result,
85                    data_region_result,
86                    physical_region_id,
87                    physical_region_options,
88                    true,
89                )
90                .await
91                .map_err(BoxedError::new);
92            responses.push((physical_region_id, response));
93        }
94
95        Ok(responses)
96    }
97
98    // If the metadata region is opened with a stale manifest,
99    // the metric engine may fail to recover logical tables from the metadata region,
100    // as the manifest could reference files that have already been deleted
101    // due to compaction operations performed by the region leader.
102    async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
103        info!(
104            "Closing metadata region {} and data region {} on metadata recovery failure",
105            utils::to_metadata_region_id(physical_region_id),
106            utils::to_data_region_id(physical_region_id)
107        );
108        if let Err(err) = self.close_physical_region(physical_region_id).await {
109            error!(err; "Failed to close physical region {}", physical_region_id);
110        }
111    }
112
113    pub(crate) async fn recover_physical_region_with_results(
114        &self,
115        metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
116        data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
117        physical_region_id: RegionId,
118        physical_region_options: PhysicalRegionOptions,
119        close_region_on_failure: bool,
120    ) -> Result<RegionResponse> {
121        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
122        let data_region_id = utils::to_data_region_id(physical_region_id);
123        let _ = metadata_region_result
124            .context(NoOpenRegionResultSnafu {
125                region_id: metadata_region_id,
126            })?
127            .context(OpenMitoRegionSnafu {
128                region_type: "metadata",
129            })?;
130
131        let data_region_response = data_region_result
132            .context(NoOpenRegionResultSnafu {
133                region_id: data_region_id,
134            })?
135            .context(OpenMitoRegionSnafu {
136                region_type: "data",
137            })?;
138
139        if let Err(err) = self
140            .recover_states(physical_region_id, physical_region_options)
141            .await
142        {
143            if close_region_on_failure {
144                self.close_physical_region_on_recovery_failure(physical_region_id)
145                    .await;
146            }
147            return Err(err);
148        }
149        Ok(data_region_response)
150    }
151
152    /// Open a metric region.
153    ///
154    /// Only open requests to a physical region matter. Those to logical regions are
155    /// actually an empty operation -- it only check if the request is valid. Since
156    /// logical regions are multiplexed over physical regions, they are always "open".
157    ///
158    /// If trying to open a logical region whose physical region is not open, metric
159    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
160    /// error.
161    pub async fn open_region(
162        &self,
163        region_id: RegionId,
164        request: RegionOpenRequest,
165    ) -> Result<AffectedRows> {
166        if request.is_physical_table() {
167            if self
168                .state
169                .read()
170                .unwrap()
171                .physical_region_states()
172                .get(&region_id)
173                .is_some()
174            {
175                warn!(
176                    "The physical region {} is already open, ignore the open request",
177                    region_id
178                );
179                return Ok(0);
180            }
181            // open physical region and recover states
182            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
183            self.open_physical_region(region_id, request).await?;
184            if let Err(err) = self
185                .recover_states(region_id, physical_region_options)
186                .await
187            {
188                self.close_physical_region_on_recovery_failure(region_id)
189                    .await;
190                return Err(err);
191            }
192
193            Ok(0)
194        } else {
195            // Don't check if the logical region exist. Because a logical region cannot be opened
196            // individually, it is always "open" if its physical region is open. But the engine
197            // can't tell if the logical region is not exist or the physical region is not opened
198            // yet. Thus simply return `Ok` here to ignore all those errors.
199            Ok(0)
200        }
201    }
202
203    /// Transform the open request to open metadata region and data region.
204    ///
205    /// Returns:
206    /// - The open request for metadata region.
207    /// - The open request for data region.
208    fn transform_open_physical_region_request(
209        &self,
210        request: RegionOpenRequest,
211    ) -> (RegionOpenRequest, RegionOpenRequest) {
212        let metadata_region_options = region_options_for_metadata_region(&request.options);
213        let checkpoint = request.checkpoint;
214
215        let open_metadata_region_request = RegionOpenRequest {
216            table_dir: request.table_dir.clone(),
217            path_type: PathType::Metadata,
218            options: metadata_region_options,
219            engine: MITO_ENGINE_NAME.to_string(),
220            skip_wal_replay: request.skip_wal_replay,
221            checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
222                entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
223                metadata_entry_id: None,
224            }),
225        };
226
227        let mut data_region_options = request.options;
228        set_data_region_options(
229            &mut data_region_options,
230            self.config.experimental_sparse_primary_key_encoding,
231        );
232        let open_data_region_request = RegionOpenRequest {
233            table_dir: request.table_dir.clone(),
234            path_type: PathType::Data,
235            options: data_region_options,
236            engine: MITO_ENGINE_NAME.to_string(),
237            skip_wal_replay: request.skip_wal_replay,
238            checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
239                entry_id: checkpoint.entry_id,
240                metadata_entry_id: None,
241            }),
242        };
243
244        (open_metadata_region_request, open_data_region_request)
245    }
246
247    /// Invokes mito engine to open physical regions (data and metadata).
248    async fn open_physical_region(
249        &self,
250        region_id: RegionId,
251        request: RegionOpenRequest,
252    ) -> Result<AffectedRows> {
253        let metadata_region_id = utils::to_metadata_region_id(region_id);
254        let data_region_id = utils::to_data_region_id(region_id);
255        let (open_metadata_region_request, open_data_region_request) =
256            self.transform_open_physical_region_request(request);
257        let _ = self
258            .mito
259            .handle_batch_open_requests(
260                2,
261                vec![
262                    (metadata_region_id, open_metadata_region_request),
263                    (data_region_id, open_data_region_request),
264                ],
265            )
266            .await
267            .context(BatchOpenMitoRegionSnafu {})?;
268
269        info!("Opened physical metric region {region_id}");
270        PHYSICAL_REGION_COUNT.inc();
271
272        Ok(0)
273    }
274
275    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
276    /// physical region (idnefied by the given region id).
277    ///
278    /// Includes:
279    /// - Record physical region's column names
280    /// - Record the mapping between logical region id and physical region id
281    ///
282    /// Returns new opened logical region ids.
283    pub(crate) async fn recover_states(
284        &self,
285        physical_region_id: RegionId,
286        physical_region_options: PhysicalRegionOptions,
287    ) -> Result<Vec<RegionId>> {
288        // load logical regions and physical column names
289        let logical_regions = self
290            .metadata_region
291            .logical_regions(physical_region_id)
292            .await?;
293        let physical_columns = self
294            .data_region
295            .physical_columns(physical_region_id)
296            .await?;
297        let primary_key_encoding = self
298            .mito
299            .get_primary_key_encoding(physical_region_id)
300            .context(PhysicalRegionNotFoundSnafu {
301                region_id: physical_region_id,
302            })?;
303
304        {
305            let mut state = self.state.write().unwrap();
306            // recover physical column names
307            // Safety: The physical columns are loaded from the data region, which always
308            // has a time index.
309            let time_index_unit = physical_columns
310                .iter()
311                .find_map(|col| {
312                    if col.semantic_type == SemanticType::Timestamp {
313                        col.column_schema
314                            .data_type
315                            .as_timestamp()
316                            .map(|data_type| data_type.unit())
317                    } else {
318                        None
319                    }
320                })
321                .unwrap();
322            let physical_columns = physical_columns
323                .into_iter()
324                .map(|col| (col.column_schema.name, col.column_id))
325                .collect();
326            state.add_physical_region(
327                physical_region_id,
328                physical_columns,
329                primary_key_encoding,
330                physical_region_options,
331                time_index_unit,
332            );
333            // recover logical regions
334            for logical_region_id in &logical_regions {
335                state.add_logical_region(physical_region_id, *logical_region_id);
336            }
337        }
338
339        let mut opened_logical_region_ids = Vec::new();
340        // The `recover_states` may be called multiple times, we only count the logical regions
341        // that are opened for the first time.
342        for logical_region_id in logical_regions {
343            if self
344                .metadata_region
345                .open_logical_region(logical_region_id)
346                .await
347            {
348                opened_logical_region_ids.push(logical_region_id);
349            }
350        }
351
352        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
353
354        Ok(opened_logical_region_ids)
355    }
356}
357
358// Unit tests in engine.rs