metric_engine/engine/
open.rs1use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::info;
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use object_store::util::join_dir;
24use snafu::{OptionExt, ResultExt};
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34 BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
35 PhysicalRegionNotFoundSnafu, Result,
36};
37use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
38use crate::utils;
39
40impl MetricEngineInner {
41 pub async fn handle_batch_open_requests(
42 &self,
43 parallelism: usize,
44 requests: Vec<(RegionId, RegionOpenRequest)>,
45 ) -> Result<BatchResponses> {
46 let mut all_requests = Vec::with_capacity(requests.len() * 2);
48 let mut physical_region_ids = HashMap::with_capacity(requests.len());
49
50 for (region_id, request) in requests {
51 if !request.is_physical_table() {
52 continue;
53 }
54 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55 let metadata_region_id = utils::to_metadata_region_id(region_id);
56 let data_region_id = utils::to_data_region_id(region_id);
57 let (open_metadata_region_request, open_data_region_request) =
58 self.transform_open_physical_region_request(request);
59 all_requests.push((metadata_region_id, open_metadata_region_request));
60 all_requests.push((data_region_id, open_data_region_request));
61 physical_region_ids.insert(region_id, physical_region_options);
62 }
63
64 let mut results = self
65 .mito
66 .handle_batch_open_requests(parallelism, all_requests)
67 .await
68 .context(BatchOpenMitoRegionSnafu {})?
69 .into_iter()
70 .collect::<HashMap<_, _>>();
71
72 let mut responses = Vec::with_capacity(physical_region_ids.len());
73 for (physical_region_id, physical_region_options) in physical_region_ids {
74 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
75 let data_region_id = utils::to_data_region_id(physical_region_id);
76 let metadata_region_result = results.remove(&metadata_region_id);
77 let data_region_result = results.remove(&data_region_id);
78 let response = self
83 .open_physical_region_with_results(
84 metadata_region_result,
85 data_region_result,
86 physical_region_id,
87 physical_region_options,
88 )
89 .await
90 .map_err(BoxedError::new);
91 responses.push((physical_region_id, response));
92 }
93
94 Ok(responses)
95 }
96
97 async fn open_physical_region_with_results(
98 &self,
99 metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
100 data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
101 physical_region_id: RegionId,
102 physical_region_options: PhysicalRegionOptions,
103 ) -> Result<RegionResponse> {
104 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
105 let data_region_id = utils::to_data_region_id(physical_region_id);
106 let _ = metadata_region_result
107 .context(NoOpenRegionResultSnafu {
108 region_id: metadata_region_id,
109 })?
110 .context(OpenMitoRegionSnafu {
111 region_type: "metadata",
112 })?;
113
114 let data_region_response = data_region_result
115 .context(NoOpenRegionResultSnafu {
116 region_id: data_region_id,
117 })?
118 .context(OpenMitoRegionSnafu {
119 region_type: "data",
120 })?;
121
122 self.recover_states(physical_region_id, physical_region_options)
123 .await?;
124 Ok(data_region_response)
125 }
126
127 pub async fn open_region(
137 &self,
138 region_id: RegionId,
139 request: RegionOpenRequest,
140 ) -> Result<AffectedRows> {
141 if request.is_physical_table() {
142 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
144 self.open_physical_region(region_id, request).await?;
145 self.recover_states(region_id, physical_region_options)
146 .await?;
147
148 Ok(0)
149 } else {
150 Ok(0)
155 }
156 }
157
158 fn transform_open_physical_region_request(
164 &self,
165 request: RegionOpenRequest,
166 ) -> (RegionOpenRequest, RegionOpenRequest) {
167 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
168 let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
169
170 let metadata_region_options = region_options_for_metadata_region(&request.options);
171 let open_metadata_region_request = RegionOpenRequest {
172 region_dir: metadata_region_dir,
173 options: metadata_region_options,
174 engine: MITO_ENGINE_NAME.to_string(),
175 skip_wal_replay: request.skip_wal_replay,
176 };
177
178 let mut data_region_options = request.options;
179 set_data_region_options(
180 &mut data_region_options,
181 self.config.experimental_sparse_primary_key_encoding,
182 );
183 let open_data_region_request = RegionOpenRequest {
184 region_dir: data_region_dir,
185 options: data_region_options,
186 engine: MITO_ENGINE_NAME.to_string(),
187 skip_wal_replay: request.skip_wal_replay,
188 };
189
190 (open_metadata_region_request, open_data_region_request)
191 }
192
193 async fn open_physical_region(
195 &self,
196 region_id: RegionId,
197 request: RegionOpenRequest,
198 ) -> Result<AffectedRows> {
199 let metadata_region_id = utils::to_metadata_region_id(region_id);
200 let data_region_id = utils::to_data_region_id(region_id);
201 let (open_metadata_region_request, open_data_region_request) =
202 self.transform_open_physical_region_request(request);
203
204 self.mito
205 .handle_request(
206 metadata_region_id,
207 RegionRequest::Open(open_metadata_region_request),
208 )
209 .await
210 .with_context(|_| OpenMitoRegionSnafu {
211 region_type: "metadata",
212 })?;
213 self.mito
214 .handle_request(
215 data_region_id,
216 RegionRequest::Open(open_data_region_request),
217 )
218 .await
219 .with_context(|_| OpenMitoRegionSnafu {
220 region_type: "data",
221 })?;
222
223 info!("Opened physical metric region {region_id}");
224 PHYSICAL_REGION_COUNT.inc();
225
226 Ok(0)
227 }
228
229 pub(crate) async fn recover_states(
238 &self,
239 physical_region_id: RegionId,
240 physical_region_options: PhysicalRegionOptions,
241 ) -> Result<Vec<RegionId>> {
242 let logical_regions = self
244 .metadata_region
245 .logical_regions(physical_region_id)
246 .await?;
247 let physical_columns = self
248 .data_region
249 .physical_columns(physical_region_id)
250 .await?;
251 let primary_key_encoding = self
252 .mito
253 .get_primary_key_encoding(physical_region_id)
254 .context(PhysicalRegionNotFoundSnafu {
255 region_id: physical_region_id,
256 })?;
257
258 {
259 let mut state = self.state.write().unwrap();
260 let time_index_unit = physical_columns
264 .iter()
265 .find_map(|col| {
266 if col.semantic_type == SemanticType::Timestamp {
267 col.column_schema
268 .data_type
269 .as_timestamp()
270 .map(|data_type| data_type.unit())
271 } else {
272 None
273 }
274 })
275 .unwrap();
276 let physical_columns = physical_columns
277 .into_iter()
278 .map(|col| (col.column_schema.name, col.column_id))
279 .collect();
280 state.add_physical_region(
281 physical_region_id,
282 physical_columns,
283 primary_key_encoding,
284 physical_region_options,
285 time_index_unit,
286 );
287 for logical_region_id in &logical_regions {
289 state.add_logical_region(physical_region_id, *logical_region_id);
290 }
291 }
292
293 let mut opened_logical_region_ids = Vec::new();
294 for logical_region_id in logical_regions {
297 if self
298 .metadata_region
299 .open_logical_region(logical_region_id)
300 .await
301 {
302 opened_logical_region_ids.push(logical_region_id);
303 }
304 }
305
306 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
307
308 Ok(opened_logical_region_ids)
309 }
310}
311
312