1use api::region::RegionResponse;
18use api::v1::SemanticType;
19use common_error::ext::BoxedError;
20use common_telemetry::{error, info, warn};
21use datafusion::common::HashMap;
22use mito2::engine::MITO_ENGINE_NAME;
23use snafu::{OptionExt, ResultExt};
24use store_api::region_engine::{BatchResponses, RegionEngine};
25use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::RegionId;
27
28use crate::engine::MetricEngineInner;
29use crate::engine::create::region_options_for_metadata_region;
30use crate::engine::options::{PhysicalRegionOptions, set_data_region_options};
31use crate::error::{
32 BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
33 PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39 pub async fn handle_batch_open_requests(
40 &self,
41 parallelism: usize,
42 requests: Vec<(RegionId, RegionOpenRequest)>,
43 ) -> Result<BatchResponses> {
44 let mut all_requests = Vec::with_capacity(requests.len() * 2);
46 let mut physical_region_ids = HashMap::with_capacity(requests.len());
47
48 for (region_id, request) in requests {
49 if !request.is_physical_table() {
50 warn!("Skipping non-physical table open request: {region_id}");
51 continue;
52 }
53 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
54 let metadata_region_id = utils::to_metadata_region_id(region_id);
55 let data_region_id = utils::to_data_region_id(region_id);
56 let (open_metadata_region_request, open_data_region_request) =
57 self.transform_open_physical_region_request(request);
58 all_requests.push((metadata_region_id, open_metadata_region_request));
59 all_requests.push((data_region_id, open_data_region_request));
60 physical_region_ids.insert(region_id, physical_region_options);
61 }
62
63 let mut results = self
64 .mito
65 .handle_batch_open_requests(parallelism, all_requests)
66 .await
67 .context(BatchOpenMitoRegionSnafu {})?
68 .into_iter()
69 .collect::<HashMap<_, _>>();
70
71 let mut responses = Vec::with_capacity(physical_region_ids.len());
72 for (physical_region_id, physical_region_options) in physical_region_ids {
73 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
74 let data_region_id = utils::to_data_region_id(physical_region_id);
75 let metadata_region_result = results.remove(&metadata_region_id);
76 let data_region_result: Option<std::result::Result<RegionResponse, BoxedError>> =
77 results.remove(&data_region_id);
78 let response = self
83 .recover_physical_region_with_results(
84 metadata_region_result,
85 data_region_result,
86 physical_region_id,
87 physical_region_options,
88 true,
89 )
90 .await
91 .map_err(BoxedError::new);
92 responses.push((physical_region_id, response));
93 }
94
95 Ok(responses)
96 }
97
98 async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
103 info!(
104 "Closing metadata region {} and data region {} on metadata recovery failure",
105 utils::to_metadata_region_id(physical_region_id),
106 utils::to_data_region_id(physical_region_id)
107 );
108 if let Err(err) = self.close_physical_region(physical_region_id).await {
109 error!(err; "Failed to close physical region {}", physical_region_id);
110 }
111 }
112
113 pub(crate) async fn recover_physical_region_with_results(
114 &self,
115 metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
116 data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
117 physical_region_id: RegionId,
118 physical_region_options: PhysicalRegionOptions,
119 close_region_on_failure: bool,
120 ) -> Result<RegionResponse> {
121 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
122 let data_region_id = utils::to_data_region_id(physical_region_id);
123 let _ = metadata_region_result
124 .context(NoOpenRegionResultSnafu {
125 region_id: metadata_region_id,
126 })?
127 .context(OpenMitoRegionSnafu {
128 region_type: "metadata",
129 })?;
130
131 let data_region_response = data_region_result
132 .context(NoOpenRegionResultSnafu {
133 region_id: data_region_id,
134 })?
135 .context(OpenMitoRegionSnafu {
136 region_type: "data",
137 })?;
138
139 if let Err(err) = self
140 .recover_states(physical_region_id, physical_region_options)
141 .await
142 {
143 if close_region_on_failure {
144 self.close_physical_region_on_recovery_failure(physical_region_id)
145 .await;
146 }
147 return Err(err);
148 }
149 Ok(data_region_response)
150 }
151
152 pub async fn open_region(
162 &self,
163 region_id: RegionId,
164 request: RegionOpenRequest,
165 ) -> Result<AffectedRows> {
166 if request.is_physical_table() {
167 if self
168 .state
169 .read()
170 .unwrap()
171 .physical_region_states()
172 .get(®ion_id)
173 .is_some()
174 {
175 warn!(
176 "The physical region {} is already open, ignore the open request",
177 region_id
178 );
179 return Ok(0);
180 }
181 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
183 self.open_physical_region(region_id, request).await?;
184 if let Err(err) = self
185 .recover_states(region_id, physical_region_options)
186 .await
187 {
188 self.close_physical_region_on_recovery_failure(region_id)
189 .await;
190 return Err(err);
191 }
192
193 Ok(0)
194 } else {
195 Ok(0)
200 }
201 }
202
203 fn transform_open_physical_region_request(
209 &self,
210 request: RegionOpenRequest,
211 ) -> (RegionOpenRequest, RegionOpenRequest) {
212 let metadata_region_options = region_options_for_metadata_region(&request.options);
213 let checkpoint = request.checkpoint;
214
215 let open_metadata_region_request = RegionOpenRequest {
216 table_dir: request.table_dir.clone(),
217 path_type: PathType::Metadata,
218 options: metadata_region_options,
219 engine: MITO_ENGINE_NAME.to_string(),
220 skip_wal_replay: request.skip_wal_replay,
221 checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
222 entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
223 metadata_entry_id: None,
224 }),
225 };
226
227 let mut data_region_options = request.options;
228 set_data_region_options(
229 &mut data_region_options,
230 self.config.experimental_sparse_primary_key_encoding,
231 );
232 let open_data_region_request = RegionOpenRequest {
233 table_dir: request.table_dir.clone(),
234 path_type: PathType::Data,
235 options: data_region_options,
236 engine: MITO_ENGINE_NAME.to_string(),
237 skip_wal_replay: request.skip_wal_replay,
238 checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
239 entry_id: checkpoint.entry_id,
240 metadata_entry_id: None,
241 }),
242 };
243
244 (open_metadata_region_request, open_data_region_request)
245 }
246
247 async fn open_physical_region(
249 &self,
250 region_id: RegionId,
251 request: RegionOpenRequest,
252 ) -> Result<AffectedRows> {
253 let metadata_region_id = utils::to_metadata_region_id(region_id);
254 let data_region_id = utils::to_data_region_id(region_id);
255 let (open_metadata_region_request, open_data_region_request) =
256 self.transform_open_physical_region_request(request);
257 let _ = self
258 .mito
259 .handle_batch_open_requests(
260 2,
261 vec![
262 (metadata_region_id, open_metadata_region_request),
263 (data_region_id, open_data_region_request),
264 ],
265 )
266 .await
267 .context(BatchOpenMitoRegionSnafu {})?;
268
269 info!("Opened physical metric region {region_id}");
270 PHYSICAL_REGION_COUNT.inc();
271
272 Ok(0)
273 }
274
275 pub(crate) async fn recover_states(
284 &self,
285 physical_region_id: RegionId,
286 physical_region_options: PhysicalRegionOptions,
287 ) -> Result<Vec<RegionId>> {
288 let logical_regions = self
290 .metadata_region
291 .logical_regions(physical_region_id)
292 .await?;
293 let physical_columns = self
294 .data_region
295 .physical_columns(physical_region_id)
296 .await?;
297 let primary_key_encoding = self
298 .mito
299 .get_primary_key_encoding(physical_region_id)
300 .context(PhysicalRegionNotFoundSnafu {
301 region_id: physical_region_id,
302 })?;
303
304 {
305 let mut state = self.state.write().unwrap();
306 let time_index_unit = physical_columns
310 .iter()
311 .find_map(|col| {
312 if col.semantic_type == SemanticType::Timestamp {
313 col.column_schema
314 .data_type
315 .as_timestamp()
316 .map(|data_type| data_type.unit())
317 } else {
318 None
319 }
320 })
321 .unwrap();
322 let physical_columns = physical_columns
323 .into_iter()
324 .map(|col| (col.column_schema.name, col.column_id))
325 .collect();
326 state.add_physical_region(
327 physical_region_id,
328 physical_columns,
329 primary_key_encoding,
330 physical_region_options,
331 time_index_unit,
332 );
333 for logical_region_id in &logical_regions {
335 state.add_logical_region(physical_region_id, *logical_region_id);
336 }
337 }
338
339 let mut opened_logical_region_ids = Vec::new();
340 for logical_region_id in logical_regions {
343 if self
344 .metadata_region
345 .open_logical_region(logical_region_id)
346 .await
347 {
348 opened_logical_region_ids.push(logical_region_id);
349 }
350 }
351
352 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
353
354 Ok(opened_logical_region_ids)
355 }
356}
357
358