metric_engine/engine/
open.rs1use std::collections::HashSet;
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use mito2::engine::MITO_ENGINE_NAME;
22use object_store::util::join_dir;
23use snafu::{OptionExt, ResultExt};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34 BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
35};
36use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
37use crate::utils;
38
39impl MetricEngineInner {
40 pub async fn handle_batch_open_requests(
41 &self,
42 parallelism: usize,
43 requests: Vec<(RegionId, RegionOpenRequest)>,
44 ) -> Result<BatchResponses> {
45 let mut all_requests = Vec::with_capacity(requests.len() * 2);
47 let mut physical_region_ids = Vec::with_capacity(requests.len());
48 let mut data_region_ids = HashSet::with_capacity(requests.len());
49
50 for (region_id, request) in requests {
51 if !request.is_physical_table() {
52 continue;
53 }
54 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55 let metadata_region_id = utils::to_metadata_region_id(region_id);
56 let data_region_id = utils::to_data_region_id(region_id);
57 let (open_metadata_region_request, open_data_region_request) =
58 self.transform_open_physical_region_request(request);
59 all_requests.push((metadata_region_id, open_metadata_region_request));
60 all_requests.push((data_region_id, open_data_region_request));
61 physical_region_ids.push((region_id, physical_region_options));
62 data_region_ids.insert(data_region_id);
63 }
64
65 let results = self
66 .mito
67 .handle_batch_open_requests(parallelism, all_requests)
68 .await
69 .context(BatchOpenMitoRegionSnafu {})?
70 .into_iter()
71 .filter(|(region_id, _)| data_region_ids.contains(region_id))
72 .collect::<Vec<_>>();
73
74 for (physical_region_id, physical_region_options) in physical_region_ids {
75 let primary_key_encoding = self
76 .mito
77 .get_primary_key_encoding(physical_region_id)
78 .context(PhysicalRegionNotFoundSnafu {
79 region_id: physical_region_id,
80 })?;
81 self.recover_states(
82 physical_region_id,
83 primary_key_encoding,
84 physical_region_options,
85 )
86 .await?;
87 }
88
89 Ok(results)
90 }
91
92 pub async fn open_region(
102 &self,
103 region_id: RegionId,
104 request: RegionOpenRequest,
105 ) -> Result<AffectedRows> {
106 if request.is_physical_table() {
107 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
109 self.open_physical_region(region_id, request).await?;
110 let data_region_id = utils::to_data_region_id(region_id);
111 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
112 PhysicalRegionNotFoundSnafu {
113 region_id: data_region_id,
114 },
115 )?;
116 self.recover_states(region_id, primary_key_encoding, physical_region_options)
117 .await?;
118
119 Ok(0)
120 } else {
121 Ok(0)
126 }
127 }
128
129 fn transform_open_physical_region_request(
135 &self,
136 request: RegionOpenRequest,
137 ) -> (RegionOpenRequest, RegionOpenRequest) {
138 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
139 let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
140
141 let metadata_region_options = region_options_for_metadata_region(request.options.clone());
142 let open_metadata_region_request = RegionOpenRequest {
143 region_dir: metadata_region_dir,
144 options: metadata_region_options,
145 engine: MITO_ENGINE_NAME.to_string(),
146 skip_wal_replay: request.skip_wal_replay,
147 };
148
149 let mut data_region_options = request.options;
150 set_data_region_options(
151 &mut data_region_options,
152 self.config.experimental_sparse_primary_key_encoding,
153 );
154 let open_data_region_request = RegionOpenRequest {
155 region_dir: data_region_dir,
156 options: data_region_options,
157 engine: MITO_ENGINE_NAME.to_string(),
158 skip_wal_replay: request.skip_wal_replay,
159 };
160
161 (open_metadata_region_request, open_data_region_request)
162 }
163
164 async fn open_physical_region(
166 &self,
167 region_id: RegionId,
168 request: RegionOpenRequest,
169 ) -> Result<AffectedRows> {
170 let metadata_region_id = utils::to_metadata_region_id(region_id);
171 let data_region_id = utils::to_data_region_id(region_id);
172 let (open_metadata_region_request, open_data_region_request) =
173 self.transform_open_physical_region_request(request);
174
175 self.mito
176 .handle_request(
177 metadata_region_id,
178 RegionRequest::Open(open_metadata_region_request),
179 )
180 .await
181 .with_context(|_| OpenMitoRegionSnafu {
182 region_type: "metadata",
183 })?;
184 self.mito
185 .handle_request(
186 data_region_id,
187 RegionRequest::Open(open_data_region_request),
188 )
189 .await
190 .with_context(|_| OpenMitoRegionSnafu {
191 region_type: "data",
192 })?;
193
194 info!("Opened physical metric region {region_id}");
195 PHYSICAL_REGION_COUNT.inc();
196
197 Ok(0)
198 }
199
200 pub(crate) async fn recover_states(
209 &self,
210 physical_region_id: RegionId,
211 primary_key_encoding: PrimaryKeyEncoding,
212 physical_region_options: PhysicalRegionOptions,
213 ) -> Result<Vec<RegionId>> {
214 let logical_regions = self
216 .metadata_region
217 .logical_regions(physical_region_id)
218 .await?;
219 let physical_columns = self
220 .data_region
221 .physical_columns(physical_region_id)
222 .await?;
223
224 {
225 let mut state = self.state.write().unwrap();
226 let time_index_unit = physical_columns
230 .iter()
231 .find_map(|col| {
232 if col.semantic_type == SemanticType::Timestamp {
233 col.column_schema
234 .data_type
235 .as_timestamp()
236 .map(|data_type| data_type.unit())
237 } else {
238 None
239 }
240 })
241 .unwrap();
242 let physical_columns = physical_columns
243 .into_iter()
244 .map(|col| (col.column_schema.name, col.column_id))
245 .collect();
246 state.add_physical_region(
247 physical_region_id,
248 physical_columns,
249 primary_key_encoding,
250 physical_region_options,
251 time_index_unit,
252 );
253 for logical_region_id in &logical_regions {
255 state.add_logical_region(physical_region_id, *logical_region_id);
256 }
257 }
258
259 let mut opened_logical_region_ids = Vec::new();
260 for logical_region_id in logical_regions {
263 if self
264 .metadata_region
265 .open_logical_region(logical_region_id)
266 .await
267 {
268 opened_logical_region_ids.push(logical_region_id);
269 }
270 }
271
272 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
273
274 Ok(opened_logical_region_ids)
275 }
276}
277
278