metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use std::collections::HashSet;
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use mito2::engine::MITO_ENGINE_NAME;
22use object_store::util::join_dir;
23use snafu::{OptionExt, ResultExt};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34    BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
35};
36use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
37use crate::utils;
38
39impl MetricEngineInner {
40    pub async fn handle_batch_open_requests(
41        &self,
42        parallelism: usize,
43        requests: Vec<(RegionId, RegionOpenRequest)>,
44    ) -> Result<BatchResponses> {
45        // We need to open metadata region and data region for each request.
46        let mut all_requests = Vec::with_capacity(requests.len() * 2);
47        let mut physical_region_ids = Vec::with_capacity(requests.len());
48        let mut data_region_ids = HashSet::with_capacity(requests.len());
49
50        for (region_id, request) in requests {
51            if !request.is_physical_table() {
52                continue;
53            }
54            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55            let metadata_region_id = utils::to_metadata_region_id(region_id);
56            let data_region_id = utils::to_data_region_id(region_id);
57            let (open_metadata_region_request, open_data_region_request) =
58                self.transform_open_physical_region_request(request);
59            all_requests.push((metadata_region_id, open_metadata_region_request));
60            all_requests.push((data_region_id, open_data_region_request));
61            physical_region_ids.push((region_id, physical_region_options));
62            data_region_ids.insert(data_region_id);
63        }
64
65        let results = self
66            .mito
67            .handle_batch_open_requests(parallelism, all_requests)
68            .await
69            .context(BatchOpenMitoRegionSnafu {})?
70            .into_iter()
71            .filter(|(region_id, _)| data_region_ids.contains(region_id))
72            .collect::<Vec<_>>();
73
74        for (physical_region_id, physical_region_options) in physical_region_ids {
75            let primary_key_encoding = self
76                .mito
77                .get_primary_key_encoding(physical_region_id)
78                .context(PhysicalRegionNotFoundSnafu {
79                    region_id: physical_region_id,
80                })?;
81            self.recover_states(
82                physical_region_id,
83                primary_key_encoding,
84                physical_region_options,
85            )
86            .await?;
87        }
88
89        Ok(results)
90    }
91
92    /// Open a metric region.
93    ///
94    /// Only open requests to a physical region matter. Those to logical regions are
95    /// actually an empty operation -- it only check if the request is valid. Since
96    /// logical regions are multiplexed over physical regions, they are always "open".
97    ///
98    /// If trying to open a logical region whose physical region is not open, metric
99    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
100    /// error.
101    pub async fn open_region(
102        &self,
103        region_id: RegionId,
104        request: RegionOpenRequest,
105    ) -> Result<AffectedRows> {
106        if request.is_physical_table() {
107            // open physical region and recover states
108            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
109            self.open_physical_region(region_id, request).await?;
110            let data_region_id = utils::to_data_region_id(region_id);
111            let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
112                PhysicalRegionNotFoundSnafu {
113                    region_id: data_region_id,
114                },
115            )?;
116            self.recover_states(region_id, primary_key_encoding, physical_region_options)
117                .await?;
118
119            Ok(0)
120        } else {
121            // Don't check if the logical region exist. Because a logical region cannot be opened
122            // individually, it is always "open" if its physical region is open. But the engine
123            // can't tell if the logical region is not exist or the physical region is not opened
124            // yet. Thus simply return `Ok` here to ignore all those errors.
125            Ok(0)
126        }
127    }
128
129    /// Transform the open request to open metadata region and data region.
130    ///
131    /// Returns:
132    /// - The open request for metadata region.
133    /// - The open request for data region.
134    fn transform_open_physical_region_request(
135        &self,
136        request: RegionOpenRequest,
137    ) -> (RegionOpenRequest, RegionOpenRequest) {
138        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
139        let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
140
141        let metadata_region_options = region_options_for_metadata_region(request.options.clone());
142        let open_metadata_region_request = RegionOpenRequest {
143            region_dir: metadata_region_dir,
144            options: metadata_region_options,
145            engine: MITO_ENGINE_NAME.to_string(),
146            skip_wal_replay: request.skip_wal_replay,
147        };
148
149        let mut data_region_options = request.options;
150        set_data_region_options(
151            &mut data_region_options,
152            self.config.experimental_sparse_primary_key_encoding,
153        );
154        let open_data_region_request = RegionOpenRequest {
155            region_dir: data_region_dir,
156            options: data_region_options,
157            engine: MITO_ENGINE_NAME.to_string(),
158            skip_wal_replay: request.skip_wal_replay,
159        };
160
161        (open_metadata_region_request, open_data_region_request)
162    }
163
164    /// Invokes mito engine to open physical regions (data and metadata).
165    async fn open_physical_region(
166        &self,
167        region_id: RegionId,
168        request: RegionOpenRequest,
169    ) -> Result<AffectedRows> {
170        let metadata_region_id = utils::to_metadata_region_id(region_id);
171        let data_region_id = utils::to_data_region_id(region_id);
172        let (open_metadata_region_request, open_data_region_request) =
173            self.transform_open_physical_region_request(request);
174
175        self.mito
176            .handle_request(
177                metadata_region_id,
178                RegionRequest::Open(open_metadata_region_request),
179            )
180            .await
181            .with_context(|_| OpenMitoRegionSnafu {
182                region_type: "metadata",
183            })?;
184        self.mito
185            .handle_request(
186                data_region_id,
187                RegionRequest::Open(open_data_region_request),
188            )
189            .await
190            .with_context(|_| OpenMitoRegionSnafu {
191                region_type: "data",
192            })?;
193
194        info!("Opened physical metric region {region_id}");
195        PHYSICAL_REGION_COUNT.inc();
196
197        Ok(0)
198    }
199
200    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
201    /// physical region (idnefied by the given region id).
202    ///
203    /// Includes:
204    /// - Record physical region's column names
205    /// - Record the mapping between logical region id and physical region id
206    ///
207    /// Returns new opened logical region ids.
208    pub(crate) async fn recover_states(
209        &self,
210        physical_region_id: RegionId,
211        primary_key_encoding: PrimaryKeyEncoding,
212        physical_region_options: PhysicalRegionOptions,
213    ) -> Result<Vec<RegionId>> {
214        // load logical regions and physical column names
215        let logical_regions = self
216            .metadata_region
217            .logical_regions(physical_region_id)
218            .await?;
219        let physical_columns = self
220            .data_region
221            .physical_columns(physical_region_id)
222            .await?;
223
224        {
225            let mut state = self.state.write().unwrap();
226            // recover physical column names
227            // Safety: The physical columns are loaded from the data region, which always
228            // has a time index.
229            let time_index_unit = physical_columns
230                .iter()
231                .find_map(|col| {
232                    if col.semantic_type == SemanticType::Timestamp {
233                        col.column_schema
234                            .data_type
235                            .as_timestamp()
236                            .map(|data_type| data_type.unit())
237                    } else {
238                        None
239                    }
240                })
241                .unwrap();
242            let physical_columns = physical_columns
243                .into_iter()
244                .map(|col| (col.column_schema.name, col.column_id))
245                .collect();
246            state.add_physical_region(
247                physical_region_id,
248                physical_columns,
249                primary_key_encoding,
250                physical_region_options,
251                time_index_unit,
252            );
253            // recover logical regions
254            for logical_region_id in &logical_regions {
255                state.add_logical_region(physical_region_id, *logical_region_id);
256            }
257        }
258
259        let mut opened_logical_region_ids = Vec::new();
260        // The `recover_states` may be called multiple times, we only count the logical regions
261        // that are opened for the first time.
262        for logical_region_id in logical_regions {
263            if self
264                .metadata_region
265                .open_logical_region(logical_region_id)
266                .await
267            {
268                opened_logical_region_ids.push(logical_region_id);
269            }
270        }
271
272        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
273
274        Ok(opened_logical_region_ids)
275    }
276}
277
278// Unit tests in engine.rs