1use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use snafu::{OptionExt, ResultExt};
24use store_api::region_engine::{BatchResponses, RegionEngine};
25use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::RegionId;
27
28use crate::engine::create::region_options_for_metadata_region;
29use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
30use crate::engine::MetricEngineInner;
31use crate::error::{
32 BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
33 PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39 pub async fn handle_batch_open_requests(
40 &self,
41 parallelism: usize,
42 requests: Vec<(RegionId, RegionOpenRequest)>,
43 ) -> Result<BatchResponses> {
44 let mut all_requests = Vec::with_capacity(requests.len() * 2);
46 let mut physical_region_ids = HashMap::with_capacity(requests.len());
47
48 for (region_id, request) in requests {
49 if !request.is_physical_table() {
50 continue;
51 }
52 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
53 let metadata_region_id = utils::to_metadata_region_id(region_id);
54 let data_region_id = utils::to_data_region_id(region_id);
55 let (open_metadata_region_request, open_data_region_request) =
56 self.transform_open_physical_region_request(request);
57 all_requests.push((metadata_region_id, open_metadata_region_request));
58 all_requests.push((data_region_id, open_data_region_request));
59 physical_region_ids.insert(region_id, physical_region_options);
60 }
61
62 let mut results = self
63 .mito
64 .handle_batch_open_requests(parallelism, all_requests)
65 .await
66 .context(BatchOpenMitoRegionSnafu {})?
67 .into_iter()
68 .collect::<HashMap<_, _>>();
69
70 let mut responses = Vec::with_capacity(physical_region_ids.len());
71 for (physical_region_id, physical_region_options) in physical_region_ids {
72 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
73 let data_region_id = utils::to_data_region_id(physical_region_id);
74 let metadata_region_result = results.remove(&metadata_region_id);
75 let data_region_result = results.remove(&data_region_id);
76 let response = self
81 .open_physical_region_with_results(
82 metadata_region_result,
83 data_region_result,
84 physical_region_id,
85 physical_region_options,
86 )
87 .await
88 .map_err(BoxedError::new);
89 responses.push((physical_region_id, response));
90 }
91
92 Ok(responses)
93 }
94
95 async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
100 info!(
101 "Closing metadata region {} and data region {} on metadata recovery failure",
102 utils::to_metadata_region_id(physical_region_id),
103 utils::to_data_region_id(physical_region_id)
104 );
105 if let Err(err) = self.close_physical_region(physical_region_id).await {
106 error!(err; "Failed to close physical region {}", physical_region_id);
107 }
108 }
109
110 async fn open_physical_region_with_results(
111 &self,
112 metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
113 data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
114 physical_region_id: RegionId,
115 physical_region_options: PhysicalRegionOptions,
116 ) -> Result<RegionResponse> {
117 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
118 let data_region_id = utils::to_data_region_id(physical_region_id);
119 let _ = metadata_region_result
120 .context(NoOpenRegionResultSnafu {
121 region_id: metadata_region_id,
122 })?
123 .context(OpenMitoRegionSnafu {
124 region_type: "metadata",
125 })?;
126
127 let data_region_response = data_region_result
128 .context(NoOpenRegionResultSnafu {
129 region_id: data_region_id,
130 })?
131 .context(OpenMitoRegionSnafu {
132 region_type: "data",
133 })?;
134
135 if let Err(err) = self
136 .recover_states(physical_region_id, physical_region_options)
137 .await
138 {
139 self.close_physical_region_on_recovery_failure(physical_region_id)
140 .await;
141 return Err(err);
142 }
143 Ok(data_region_response)
144 }
145
146 pub async fn open_region(
156 &self,
157 region_id: RegionId,
158 request: RegionOpenRequest,
159 ) -> Result<AffectedRows> {
160 if request.is_physical_table() {
161 if self
162 .state
163 .read()
164 .unwrap()
165 .physical_region_states()
166 .get(®ion_id)
167 .is_some()
168 {
169 warn!(
170 "The physical region {} is already open, ignore the open request",
171 region_id
172 );
173 return Ok(0);
174 }
175 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
177 self.open_physical_region(region_id, request).await?;
178 if let Err(err) = self
179 .recover_states(region_id, physical_region_options)
180 .await
181 {
182 self.close_physical_region_on_recovery_failure(region_id)
183 .await;
184 return Err(err);
185 }
186
187 Ok(0)
188 } else {
189 Ok(0)
194 }
195 }
196
197 fn transform_open_physical_region_request(
203 &self,
204 request: RegionOpenRequest,
205 ) -> (RegionOpenRequest, RegionOpenRequest) {
206 let metadata_region_options = region_options_for_metadata_region(&request.options);
207 let checkpoint = request.checkpoint;
208
209 let open_metadata_region_request = RegionOpenRequest {
210 table_dir: request.table_dir.clone(),
211 path_type: PathType::Metadata,
212 options: metadata_region_options,
213 engine: MITO_ENGINE_NAME.to_string(),
214 skip_wal_replay: request.skip_wal_replay,
215 checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
216 entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
217 metadata_entry_id: None,
218 }),
219 };
220
221 let mut data_region_options = request.options;
222 set_data_region_options(
223 &mut data_region_options,
224 self.config.experimental_sparse_primary_key_encoding,
225 );
226 let open_data_region_request = RegionOpenRequest {
227 table_dir: request.table_dir.clone(),
228 path_type: PathType::Data,
229 options: data_region_options,
230 engine: MITO_ENGINE_NAME.to_string(),
231 skip_wal_replay: request.skip_wal_replay,
232 checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
233 entry_id: checkpoint.entry_id,
234 metadata_entry_id: None,
235 }),
236 };
237
238 (open_metadata_region_request, open_data_region_request)
239 }
240
241 async fn open_physical_region(
243 &self,
244 region_id: RegionId,
245 request: RegionOpenRequest,
246 ) -> Result<AffectedRows> {
247 let metadata_region_id = utils::to_metadata_region_id(region_id);
248 let data_region_id = utils::to_data_region_id(region_id);
249 let (open_metadata_region_request, open_data_region_request) =
250 self.transform_open_physical_region_request(request);
251 let _ = self
252 .mito
253 .handle_batch_open_requests(
254 2,
255 vec![
256 (metadata_region_id, open_metadata_region_request),
257 (data_region_id, open_data_region_request),
258 ],
259 )
260 .await
261 .context(BatchOpenMitoRegionSnafu {})?;
262
263 info!("Opened physical metric region {region_id}");
264 PHYSICAL_REGION_COUNT.inc();
265
266 Ok(0)
267 }
268
269 pub(crate) async fn recover_states(
278 &self,
279 physical_region_id: RegionId,
280 physical_region_options: PhysicalRegionOptions,
281 ) -> Result<Vec<RegionId>> {
282 let logical_regions = self
284 .metadata_region
285 .logical_regions(physical_region_id)
286 .await?;
287 let physical_columns = self
288 .data_region
289 .physical_columns(physical_region_id)
290 .await?;
291 let primary_key_encoding = self
292 .mito
293 .get_primary_key_encoding(physical_region_id)
294 .context(PhysicalRegionNotFoundSnafu {
295 region_id: physical_region_id,
296 })?;
297
298 {
299 let mut state = self.state.write().unwrap();
300 let time_index_unit = physical_columns
304 .iter()
305 .find_map(|col| {
306 if col.semantic_type == SemanticType::Timestamp {
307 col.column_schema
308 .data_type
309 .as_timestamp()
310 .map(|data_type| data_type.unit())
311 } else {
312 None
313 }
314 })
315 .unwrap();
316 let physical_columns = physical_columns
317 .into_iter()
318 .map(|col| (col.column_schema.name, col.column_id))
319 .collect();
320 state.add_physical_region(
321 physical_region_id,
322 physical_columns,
323 primary_key_encoding,
324 physical_region_options,
325 time_index_unit,
326 );
327 for logical_region_id in &logical_regions {
329 state.add_logical_region(physical_region_id, *logical_region_id);
330 }
331 }
332
333 let mut opened_logical_region_ids = Vec::new();
334 for logical_region_id in logical_regions {
337 if self
338 .metadata_region
339 .open_logical_region(logical_region_id)
340 .await
341 {
342 opened_logical_region_ids.push(logical_region_id);
343 }
344 }
345
346 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
347
348 Ok(opened_logical_region_ids)
349 }
350}
351
352