metric_engine/engine/
open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Open a metric region.
16
17use std::collections::HashSet;
18
19use common_telemetry::info;
20use mito2::engine::MITO_ENGINE_NAME;
21use object_store::util::join_dir;
22use snafu::{OptionExt, ResultExt};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
25use store_api::region_engine::{BatchResponses, RegionEngine};
26use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
27use store_api::storage::RegionId;
28
29use crate::engine::create::region_options_for_metadata_region;
30use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
31use crate::engine::MetricEngineInner;
32use crate::error::{
33    BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39    pub async fn handle_batch_open_requests(
40        &self,
41        parallelism: usize,
42        requests: Vec<(RegionId, RegionOpenRequest)>,
43    ) -> Result<BatchResponses> {
44        // We need to open metadata region and data region for each request.
45        let mut all_requests = Vec::with_capacity(requests.len() * 2);
46        let mut physical_region_ids = Vec::with_capacity(requests.len());
47        let mut data_region_ids = HashSet::with_capacity(requests.len());
48
49        for (region_id, request) in requests {
50            if !request.is_physical_table() {
51                continue;
52            }
53            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
54            let metadata_region_id = utils::to_metadata_region_id(region_id);
55            let data_region_id = utils::to_data_region_id(region_id);
56            let (open_metadata_region_request, open_data_region_request) =
57                self.transform_open_physical_region_request(request);
58            all_requests.push((metadata_region_id, open_metadata_region_request));
59            all_requests.push((data_region_id, open_data_region_request));
60            physical_region_ids.push((region_id, physical_region_options));
61            data_region_ids.insert(data_region_id);
62        }
63
64        let results = self
65            .mito
66            .handle_batch_open_requests(parallelism, all_requests)
67            .await
68            .context(BatchOpenMitoRegionSnafu {})?
69            .into_iter()
70            .filter(|(region_id, _)| data_region_ids.contains(region_id))
71            .collect::<Vec<_>>();
72
73        for (physical_region_id, physical_region_options) in physical_region_ids {
74            let primary_key_encoding = self
75                .mito
76                .get_primary_key_encoding(physical_region_id)
77                .context(PhysicalRegionNotFoundSnafu {
78                    region_id: physical_region_id,
79                })?;
80            self.recover_states(
81                physical_region_id,
82                primary_key_encoding,
83                physical_region_options,
84            )
85            .await?;
86        }
87
88        Ok(results)
89    }
90
91    /// Open a metric region.
92    ///
93    /// Only open requests to a physical region matter. Those to logical regions are
94    /// actually an empty operation -- it only check if the request is valid. Since
95    /// logical regions are multiplexed over physical regions, they are always "open".
96    ///
97    /// If trying to open a logical region whose physical region is not open, metric
98    /// engine will throw a [RegionNotFound](common_error::status_code::StatusCode::RegionNotFound)
99    /// error.
100    pub async fn open_region(
101        &self,
102        region_id: RegionId,
103        request: RegionOpenRequest,
104    ) -> Result<AffectedRows> {
105        if request.is_physical_table() {
106            // open physical region and recover states
107            let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
108            self.open_physical_region(region_id, request).await?;
109            let data_region_id = utils::to_data_region_id(region_id);
110            let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
111                PhysicalRegionNotFoundSnafu {
112                    region_id: data_region_id,
113                },
114            )?;
115            self.recover_states(region_id, primary_key_encoding, physical_region_options)
116                .await?;
117
118            Ok(0)
119        } else {
120            // Don't check if the logical region exist. Because a logical region cannot be opened
121            // individually, it is always "open" if its physical region is open. But the engine
122            // can't tell if the logical region is not exist or the physical region is not opened
123            // yet. Thus simply return `Ok` here to ignore all those errors.
124            Ok(0)
125        }
126    }
127
128    /// Transform the open request to open metadata region and data region.
129    ///
130    /// Returns:
131    /// - The open request for metadata region.
132    /// - The open request for data region.
133    fn transform_open_physical_region_request(
134        &self,
135        request: RegionOpenRequest,
136    ) -> (RegionOpenRequest, RegionOpenRequest) {
137        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
138        let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
139
140        let metadata_region_options = region_options_for_metadata_region(request.options.clone());
141        let open_metadata_region_request = RegionOpenRequest {
142            region_dir: metadata_region_dir,
143            options: metadata_region_options,
144            engine: MITO_ENGINE_NAME.to_string(),
145            skip_wal_replay: request.skip_wal_replay,
146        };
147
148        let mut data_region_options = request.options;
149        set_data_region_options(
150            &mut data_region_options,
151            self.config.experimental_sparse_primary_key_encoding,
152        );
153        let open_data_region_request = RegionOpenRequest {
154            region_dir: data_region_dir,
155            options: data_region_options,
156            engine: MITO_ENGINE_NAME.to_string(),
157            skip_wal_replay: request.skip_wal_replay,
158        };
159
160        (open_metadata_region_request, open_data_region_request)
161    }
162
163    /// Invokes mito engine to open physical regions (data and metadata).
164    async fn open_physical_region(
165        &self,
166        region_id: RegionId,
167        request: RegionOpenRequest,
168    ) -> Result<AffectedRows> {
169        let metadata_region_id = utils::to_metadata_region_id(region_id);
170        let data_region_id = utils::to_data_region_id(region_id);
171        let (open_metadata_region_request, open_data_region_request) =
172            self.transform_open_physical_region_request(request);
173
174        self.mito
175            .handle_request(
176                metadata_region_id,
177                RegionRequest::Open(open_metadata_region_request),
178            )
179            .await
180            .with_context(|_| OpenMitoRegionSnafu {
181                region_type: "metadata",
182            })?;
183        self.mito
184            .handle_request(
185                data_region_id,
186                RegionRequest::Open(open_data_region_request),
187            )
188            .await
189            .with_context(|_| OpenMitoRegionSnafu {
190                region_type: "data",
191            })?;
192
193        info!("Opened physical metric region {region_id}");
194        PHYSICAL_REGION_COUNT.inc();
195
196        Ok(0)
197    }
198
199    /// Recovers [MetricEngineState](crate::engine::state::MetricEngineState) from
200    /// physical region (idnefied by the given region id).
201    ///
202    /// Includes:
203    /// - Record physical region's column names
204    /// - Record the mapping between logical region id and physical region id
205    ///
206    /// Returns new opened logical region ids.
207    pub(crate) async fn recover_states(
208        &self,
209        physical_region_id: RegionId,
210        primary_key_encoding: PrimaryKeyEncoding,
211        physical_region_options: PhysicalRegionOptions,
212    ) -> Result<Vec<RegionId>> {
213        // load logical regions and physical column names
214        let logical_regions = self
215            .metadata_region
216            .logical_regions(physical_region_id)
217            .await?;
218        let physical_columns = self
219            .data_region
220            .physical_columns(physical_region_id)
221            .await?;
222
223        {
224            let mut state = self.state.write().unwrap();
225            // recover physical column names
226            let physical_columns = physical_columns
227                .into_iter()
228                .map(|col| (col.column_schema.name, col.column_id))
229                .collect();
230            state.add_physical_region(
231                physical_region_id,
232                physical_columns,
233                primary_key_encoding,
234                physical_region_options,
235            );
236            // recover logical regions
237            for logical_region_id in &logical_regions {
238                state.add_logical_region(physical_region_id, *logical_region_id);
239            }
240        }
241
242        let mut opened_logical_region_ids = Vec::new();
243        // The `recover_states` may be called multiple times, we only count the logical regions
244        // that are opened for the first time.
245        for logical_region_id in logical_regions {
246            if self
247                .metadata_region
248                .open_logical_region(logical_region_id)
249                .await
250            {
251                opened_logical_region_ids.push(logical_region_id);
252            }
253        }
254
255        LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
256
257        Ok(opened_logical_region_ids)
258    }
259}
260
261// Unit tests in engine.rs