1use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use object_store::util::join_dir;
24use snafu::{OptionExt, ResultExt};
25use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
26use store_api::region_engine::{BatchResponses, RegionEngine};
27use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
28use store_api::storage::RegionId;
29
30use crate::engine::create::region_options_for_metadata_region;
31use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
32use crate::engine::MetricEngineInner;
33use crate::error::{
34 BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
35 PhysicalRegionNotFoundSnafu, Result,
36};
37use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
38use crate::utils;
39
40impl MetricEngineInner {
41 pub async fn handle_batch_open_requests(
42 &self,
43 parallelism: usize,
44 requests: Vec<(RegionId, RegionOpenRequest)>,
45 ) -> Result<BatchResponses> {
46 let mut all_requests = Vec::with_capacity(requests.len() * 2);
48 let mut physical_region_ids = HashMap::with_capacity(requests.len());
49
50 for (region_id, request) in requests {
51 if !request.is_physical_table() {
52 continue;
53 }
54 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
55 let metadata_region_id = utils::to_metadata_region_id(region_id);
56 let data_region_id = utils::to_data_region_id(region_id);
57 let (open_metadata_region_request, open_data_region_request) =
58 self.transform_open_physical_region_request(request);
59 all_requests.push((metadata_region_id, open_metadata_region_request));
60 all_requests.push((data_region_id, open_data_region_request));
61 physical_region_ids.insert(region_id, physical_region_options);
62 }
63
64 let mut results = self
65 .mito
66 .handle_batch_open_requests(parallelism, all_requests)
67 .await
68 .context(BatchOpenMitoRegionSnafu {})?
69 .into_iter()
70 .collect::<HashMap<_, _>>();
71
72 let mut responses = Vec::with_capacity(physical_region_ids.len());
73 for (physical_region_id, physical_region_options) in physical_region_ids {
74 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
75 let data_region_id = utils::to_data_region_id(physical_region_id);
76 let metadata_region_result = results.remove(&metadata_region_id);
77 let data_region_result = results.remove(&data_region_id);
78 let response = self
83 .open_physical_region_with_results(
84 metadata_region_result,
85 data_region_result,
86 physical_region_id,
87 physical_region_options,
88 )
89 .await
90 .map_err(BoxedError::new);
91 responses.push((physical_region_id, response));
92 }
93
94 Ok(responses)
95 }
96
97 async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
102 info!(
103 "Closing metadata region {} and data region {} on metadata recovery failure",
104 utils::to_metadata_region_id(physical_region_id),
105 utils::to_data_region_id(physical_region_id)
106 );
107 if let Err(err) = self.close_physical_region(physical_region_id).await {
108 error!(err; "Failed to close physical region {}", physical_region_id);
109 }
110 }
111
112 async fn open_physical_region_with_results(
113 &self,
114 metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
115 data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
116 physical_region_id: RegionId,
117 physical_region_options: PhysicalRegionOptions,
118 ) -> Result<RegionResponse> {
119 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
120 let data_region_id = utils::to_data_region_id(physical_region_id);
121 let _ = metadata_region_result
122 .context(NoOpenRegionResultSnafu {
123 region_id: metadata_region_id,
124 })?
125 .context(OpenMitoRegionSnafu {
126 region_type: "metadata",
127 })?;
128
129 let data_region_response = data_region_result
130 .context(NoOpenRegionResultSnafu {
131 region_id: data_region_id,
132 })?
133 .context(OpenMitoRegionSnafu {
134 region_type: "data",
135 })?;
136
137 if let Err(err) = self
138 .recover_states(physical_region_id, physical_region_options)
139 .await
140 {
141 self.close_physical_region_on_recovery_failure(physical_region_id)
142 .await;
143 return Err(err);
144 }
145 Ok(data_region_response)
146 }
147
148 pub async fn open_region(
158 &self,
159 region_id: RegionId,
160 request: RegionOpenRequest,
161 ) -> Result<AffectedRows> {
162 if request.is_physical_table() {
163 if self
164 .state
165 .read()
166 .unwrap()
167 .physical_region_states()
168 .get(®ion_id)
169 .is_some()
170 {
171 warn!(
172 "The physical region {} is already open, ignore the open request",
173 region_id
174 );
175 return Ok(0);
176 }
177 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
179 self.open_physical_region(region_id, request).await?;
180 if let Err(err) = self
181 .recover_states(region_id, physical_region_options)
182 .await
183 {
184 self.close_physical_region_on_recovery_failure(region_id)
185 .await;
186 return Err(err);
187 }
188
189 Ok(0)
190 } else {
191 Ok(0)
196 }
197 }
198
199 fn transform_open_physical_region_request(
205 &self,
206 request: RegionOpenRequest,
207 ) -> (RegionOpenRequest, RegionOpenRequest) {
208 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
209 let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
210
211 let metadata_region_options = region_options_for_metadata_region(&request.options);
212 let open_metadata_region_request = RegionOpenRequest {
213 region_dir: metadata_region_dir,
214 options: metadata_region_options,
215 engine: MITO_ENGINE_NAME.to_string(),
216 skip_wal_replay: request.skip_wal_replay,
217 };
218
219 let mut data_region_options = request.options;
220 set_data_region_options(
221 &mut data_region_options,
222 self.config.experimental_sparse_primary_key_encoding,
223 );
224 let open_data_region_request = RegionOpenRequest {
225 region_dir: data_region_dir,
226 options: data_region_options,
227 engine: MITO_ENGINE_NAME.to_string(),
228 skip_wal_replay: request.skip_wal_replay,
229 };
230
231 (open_metadata_region_request, open_data_region_request)
232 }
233
234 async fn open_physical_region(
236 &self,
237 region_id: RegionId,
238 request: RegionOpenRequest,
239 ) -> Result<AffectedRows> {
240 let metadata_region_id = utils::to_metadata_region_id(region_id);
241 let data_region_id = utils::to_data_region_id(region_id);
242 let (open_metadata_region_request, open_data_region_request) =
243 self.transform_open_physical_region_request(request);
244
245 self.mito
246 .handle_request(
247 metadata_region_id,
248 RegionRequest::Open(open_metadata_region_request),
249 )
250 .await
251 .with_context(|_| OpenMitoRegionSnafu {
252 region_type: "metadata",
253 })?;
254 self.mito
255 .handle_request(
256 data_region_id,
257 RegionRequest::Open(open_data_region_request),
258 )
259 .await
260 .with_context(|_| OpenMitoRegionSnafu {
261 region_type: "data",
262 })?;
263
264 info!("Opened physical metric region {region_id}");
265 PHYSICAL_REGION_COUNT.inc();
266
267 Ok(0)
268 }
269
270 pub(crate) async fn recover_states(
279 &self,
280 physical_region_id: RegionId,
281 physical_region_options: PhysicalRegionOptions,
282 ) -> Result<Vec<RegionId>> {
283 let logical_regions = self
285 .metadata_region
286 .logical_regions(physical_region_id)
287 .await?;
288 let physical_columns = self
289 .data_region
290 .physical_columns(physical_region_id)
291 .await?;
292 let primary_key_encoding = self
293 .mito
294 .get_primary_key_encoding(physical_region_id)
295 .context(PhysicalRegionNotFoundSnafu {
296 region_id: physical_region_id,
297 })?;
298
299 {
300 let mut state = self.state.write().unwrap();
301 let time_index_unit = physical_columns
305 .iter()
306 .find_map(|col| {
307 if col.semantic_type == SemanticType::Timestamp {
308 col.column_schema
309 .data_type
310 .as_timestamp()
311 .map(|data_type| data_type.unit())
312 } else {
313 None
314 }
315 })
316 .unwrap();
317 let physical_columns = physical_columns
318 .into_iter()
319 .map(|col| (col.column_schema.name, col.column_id))
320 .collect();
321 state.add_physical_region(
322 physical_region_id,
323 physical_columns,
324 primary_key_encoding,
325 physical_region_options,
326 time_index_unit,
327 );
328 for logical_region_id in &logical_regions {
330 state.add_logical_region(physical_region_id, *logical_region_id);
331 }
332 }
333
334 let mut opened_logical_region_ids = Vec::new();
335 for logical_region_id in logical_regions {
338 if self
339 .metadata_region
340 .open_logical_region(logical_region_id)
341 .await
342 {
343 opened_logical_region_ids.push(logical_region_id);
344 }
345 }
346
347 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
348
349 Ok(opened_logical_region_ids)
350 }
351}
352
353