metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::info;
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use object_store::util::join_dir;
24use snafu::{OptionExt, ResultExt};
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34    BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
35    PhysicalRegionNotFoundSnafu, Result,
36};
37use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
38use crate::utils;
39
40impl MetricEngineInner {
41    pub async fn handle_batch_open_requests(
42        &self,
43        parallelism: usize,
44        requests: Vec<(RegionId, RegionOpenRequest)>,
45    ) -> Result<BatchResponses> {
46        // We need to open metadata region and data region for each request.
47        let mut all_requests = Vec::with_capacity(requests.len() * 2);
48        let mut physical_region_ids = HashMap::with_capacity(requests.len());
49
50        for (region_id, request) in requests {
51            if !request.is_physical_table() {
52                continue;
53            }
54            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55            let metadata_region_id = utils::to_metadata_region_id(region_id);
56            let data_region_id = utils::to_data_region_id(region_id);
57            let (open_metadata_region_request, open_data_region_request) =
58                self.transform_open_physical_region_request(request);
59            all_requests.push((metadata_region_id, open_metadata_region_request));
60            all_requests.push((data_region_id, open_data_region_request));
61            physical_region_ids.insert(region_id, physical_region_options);
62        }
63
64        let mut results = self
65            .mito
66            .handle_batch_open_requests(parallelism, all_requests)
67            .await
68            .context(BatchOpenMitoRegionSnafu {})?
69            .into_iter()
70            .collect::<HashMap<_, _>>();
71
72        let mut responses = Vec::with_capacity(physical_region_ids.len());
73        for (physical_region_id, physical_region_options) in physical_region_ids {
74            let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
75            let data_region_id = utils::to_data_region_id(physical_region_id);
76            let metadata_region_result = results.remove(&metadata_region_id);
77            let data_region_result = results.remove(&data_region_id);
78            // Pass the optional `metadata_region_result` and `data_region_result` to
79            // `open_physical_region_with_results`. This function handles errors for each
80            // open physical region request, allowing the process to continue with the
81            // remaining regions even if some requests fail.
82            let response = self
83                .open_physical_region_with_results(
84                    metadata_region_result,
85                    data_region_result,
86                    physical_region_id,
87                    physical_region_options,
88                )
89                .await
90                .map_err(BoxedError::new);
91            responses.push((physical_region_id, response));
92        }
93
94        Ok(responses)
95    }
96
97    async fn open_physical_region_with_results(
98        &self,
99        metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
100        data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
101        physical_region_id: RegionId,
102        physical_region_options: PhysicalRegionOptions,
103    ) -> Result<RegionResponse> {
104        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
105        let data_region_id = utils::to_data_region_id(physical_region_id);
106        let _ = metadata_region_result
107            .context(NoOpenRegionResultSnafu {
108                region_id: metadata_region_id,
109            })?
110            .context(OpenMitoRegionSnafu {
111                region_type: "metadata",
112            })?;
113
114        let data_region_response = data_region_result
115            .context(NoOpenRegionResultSnafu {
116                region_id: data_region_id,
117            })?
118            .context(OpenMitoRegionSnafu {
119                region_type: "data",
120            })?;
121
122        self.recover_states(physical_region_id, physical_region_options)
123            .await?;
124        Ok(data_region_response)
125    }
126
127    /// Open a metric region.
128    ///
129    /// Only open requests to a physical region matter. Those to logical regions are
130    /// actually an empty operation -- it only check if the request is valid. Since
131    /// logical regions are multiplexed over physical regions, they are always "open".
132    ///
133    /// If trying to open a logical region whose physical region is not open, metric
134    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
135    /// error.
136    pub async fn open_region(
137        &self,
138        region_id: RegionId,
139        request: RegionOpenRequest,
140    ) -> Result<AffectedRows> {
141        if request.is_physical_table() {
142            // open physical region and recover states
143            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
144            self.open_physical_region(region_id, request).await?;
145            self.recover_states(region_id, physical_region_options)
146                .await?;
147
148            Ok(0)
149        } else {
150            // Don't check if the logical region exist. Because a logical region cannot be opened
151            // individually, it is always "open" if its physical region is open. But the engine
152            // can't tell if the logical region is not exist or the physical region is not opened
153            // yet. Thus simply return `Ok` here to ignore all those errors.
154            Ok(0)
155        }
156    }
157
158    /// Transform the open request to open metadata region and data region.
159    ///
160    /// Returns:
161    /// - The open request for metadata region.
162    /// - The open request for data region.
163    fn transform_open_physical_region_request(
164        &self,
165        request: RegionOpenRequest,
166    ) -> (RegionOpenRequest, RegionOpenRequest) {
167        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
168        let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
169
170        let metadata_region_options = region_options_for_metadata_region(&request.options);
171        let open_metadata_region_request = RegionOpenRequest {
172            region_dir: metadata_region_dir,
173            options: metadata_region_options,
174            engine: MITO_ENGINE_NAME.to_string(),
175            skip_wal_replay: request.skip_wal_replay,
176        };
177
178        let mut data_region_options = request.options;
179        set_data_region_options(
180            &mut data_region_options,
181            self.config.experimental_sparse_primary_key_encoding,
182        );
183        let open_data_region_request = RegionOpenRequest {
184            region_dir: data_region_dir,
185            options: data_region_options,
186            engine: MITO_ENGINE_NAME.to_string(),
187            skip_wal_replay: request.skip_wal_replay,
188        };
189
190        (open_metadata_region_request, open_data_region_request)
191    }
192
193    /// Invokes mito engine to open physical regions (data and metadata).
194    async fn open_physical_region(
195        &self,
196        region_id: RegionId,
197        request: RegionOpenRequest,
198    ) -> Result<AffectedRows> {
199        let metadata_region_id = utils::to_metadata_region_id(region_id);
200        let data_region_id = utils::to_data_region_id(region_id);
201        let (open_metadata_region_request, open_data_region_request) =
202            self.transform_open_physical_region_request(request);
203
204        self.mito
205            .handle_request(
206                metadata_region_id,
207                RegionRequest::Open(open_metadata_region_request),
208            )
209            .await
210            .with_context(|_| OpenMitoRegionSnafu {
211                region_type: "metadata",
212            })?;
213        self.mito
214            .handle_request(
215                data_region_id,
216                RegionRequest::Open(open_data_region_request),
217            )
218            .await
219            .with_context(|_| OpenMitoRegionSnafu {
220                region_type: "data",
221            })?;
222
223        info!("Opened physical metric region {region_id}");
224        PHYSICAL_REGION_COUNT.inc();
225
226        Ok(0)
227    }
228
229    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
230    /// physical region (idnefied by the given region id).
231    ///
232    /// Includes:
233    /// - Record physical region's column names
234    /// - Record the mapping between logical region id and physical region id
235    ///
236    /// Returns new opened logical region ids.
237    pub(crate) async fn recover_states(
238        &self,
239        physical_region_id: RegionId,
240        physical_region_options: PhysicalRegionOptions,
241    ) -> Result<Vec<RegionId>> {
242        // load logical regions and physical column names
243        let logical_regions = self
244            .metadata_region
245            .logical_regions(physical_region_id)
246            .await?;
247        let physical_columns = self
248            .data_region
249            .physical_columns(physical_region_id)
250            .await?;
251        let primary_key_encoding = self
252            .mito
253            .get_primary_key_encoding(physical_region_id)
254            .context(PhysicalRegionNotFoundSnafu {
255                region_id: physical_region_id,
256            })?;
257
258        {
259            let mut state = self.state.write().unwrap();
260            // recover physical column names
261            // Safety: The physical columns are loaded from the data region, which always
262            // has a time index.
263            let time_index_unit = physical_columns
264                .iter()
265                .find_map(|col| {
266                    if col.semantic_type == SemanticType::Timestamp {
267                        col.column_schema
268                            .data_type
269                            .as_timestamp()
270                            .map(|data_type| data_type.unit())
271                    } else {
272                        None
273                    }
274                })
275                .unwrap();
276            let physical_columns = physical_columns
277                .into_iter()
278                .map(|col| (col.column_schema.name, col.column_id))
279                .collect();
280            state.add_physical_region(
281                physical_region_id,
282                physical_columns,
283                primary_key_encoding,
284                physical_region_options,
285                time_index_unit,
286            );
287            // recover logical regions
288            for logical_region_id in &logical_regions {
289                state.add_logical_region(physical_region_id, *logical_region_id);
290            }
291        }
292
293        let mut opened_logical_region_ids = Vec::new();
294        // The `recover_states` may be called multiple times, we only count the logical regions
295        // that are opened for the first time.
296        for logical_region_id in logical_regions {
297            if self
298                .metadata_region
299                .open_logical_region(logical_region_id)
300                .await
301            {
302                opened_logical_region_ids.push(logical_region_id);
303            }
304        }
305
306        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
307
308        Ok(opened_logical_region_ids)
309    }
310}
311
312// Unit tests in engine.rs