1use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_telemetry::{debug, error, tracing};
19use datafusion::logical_expr::{self, Expr};
20use snafu::{OptionExt, ResultExt};
21use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::region_engine::{RegionEngine, RegionScannerRef};
24use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
25
26use crate::engine::MetricEngineInner;
27use crate::error::{
28 InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
29};
30use crate::metrics::MITO_OPERATION_ELAPSED;
31use crate::utils;
32
33impl MetricEngineInner {
34 #[tracing::instrument(skip_all)]
35 pub async fn read_region(
36 &self,
37 region_id: RegionId,
38 request: ScanRequest,
39 ) -> Result<RegionScannerRef> {
40 let is_reading_physical_region = self.is_physical_region(region_id);
41
42 if is_reading_physical_region {
43 debug!(
44 "Metric region received read request {request:?} on physical region {region_id:?}"
45 );
46 self.read_physical_region(region_id, request).await
47 } else {
48 self.read_logical_region(region_id, request).await
49 }
50 }
51
52 async fn read_physical_region(
54 &self,
55 region_id: RegionId,
56 request: ScanRequest,
57 ) -> Result<RegionScannerRef> {
58 let _timer = MITO_OPERATION_ELAPSED
59 .with_label_values(&["read_physical"])
60 .start_timer();
61
62 self.mito
63 .handle_query(region_id, request)
64 .await
65 .context(MitoReadOperationSnafu)
66 }
67
68 async fn read_logical_region(
69 &self,
70 logical_region_id: RegionId,
71 request: ScanRequest,
72 ) -> Result<RegionScannerRef> {
73 let _timer = MITO_OPERATION_ELAPSED
74 .with_label_values(&["read"])
75 .start_timer();
76
77 let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
78 let data_region_id = utils::to_data_region_id(physical_region_id);
79 let request = self
80 .transform_request(physical_region_id, logical_region_id, request)
81 .await?;
82 let mut scanner = self
83 .mito
84 .handle_query(data_region_id, request)
85 .await
86 .context(MitoReadOperationSnafu)?;
87 scanner.set_logical_region(true);
88
89 Ok(scanner)
90 }
91
92 pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<SequenceNumber> {
93 let region_id = if self.is_physical_region(region_id) {
94 region_id
95 } else {
96 let physical_region_id = self.get_physical_region_id(region_id).await?;
97 utils::to_data_region_id(physical_region_id)
98 };
99 self.mito
100 .get_committed_sequence(region_id)
101 .await
102 .context(MitoReadOperationSnafu)
103 }
104
105 pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
106 let is_reading_physical_region =
107 self.state.read().unwrap().exist_physical_region(region_id);
108
109 if is_reading_physical_region {
110 self.mito
111 .get_metadata(region_id)
112 .await
113 .context(MitoReadOperationSnafu)
114 } else {
115 let physical_region_id = self.get_physical_region_id(region_id).await?;
116 self.logical_region_metadata(physical_region_id, region_id)
117 .await
118 }
119 }
120
121 pub fn is_physical_region(&self, region_id: RegionId) -> bool {
123 self.state.read().unwrap().exist_physical_region(region_id)
124 }
125
126 async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
127 let state = &self.state.read().unwrap();
128 state
129 .get_physical_region_id(logical_region_id)
130 .with_context(|| {
131 error!("Trying to read an nonexistent region {logical_region_id}");
132 LogicalRegionNotFoundSnafu {
133 region_id: logical_region_id,
134 }
135 })
136 }
137
138 async fn transform_request(
140 &self,
141 physical_region_id: RegionId,
142 logical_region_id: RegionId,
143 mut request: ScanRequest,
144 ) -> Result<ScanRequest> {
145 let physical_projection = match request.projection_input.as_ref() {
147 Some(projection_input) => {
148 self.transform_projection(
149 physical_region_id,
150 logical_region_id,
151 &projection_input.projection,
152 )
153 .await?
154 }
155 None => {
156 self.default_projection(physical_region_id, logical_region_id)
157 .await?
158 }
159 };
160
161 request.projection_input.get_or_insert_default().projection = physical_projection;
165
166 request
167 .filters
168 .push(self.table_id_filter(logical_region_id));
169
170 Ok(request)
171 }
172
173 fn table_id_filter(&self, logical_region_id: RegionId) -> Expr {
175 logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
176 .eq(logical_expr::lit(logical_region_id.table_id()))
177 }
178
179 pub async fn transform_projection(
183 &self,
184 physical_region_id: RegionId,
185 logical_region_id: RegionId,
186 origin_projection: &[usize],
187 ) -> Result<Vec<usize>> {
188 let all_logical_columns = self
190 .load_logical_column_names(physical_region_id, logical_region_id)
191 .await?;
192 let projected_logical_names = origin_projection
193 .iter()
194 .map(|i| all_logical_columns[*i].clone())
195 .collect::<Vec<_>>();
196
197 let mut physical_projection = Vec::with_capacity(origin_projection.len());
199 let data_region_id = utils::to_data_region_id(physical_region_id);
200 let physical_metadata = self
201 .mito
202 .get_metadata(data_region_id)
203 .await
204 .context(MitoReadOperationSnafu)?;
205
206 for name in projected_logical_names {
207 physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
209 }
210
211 Ok(physical_projection)
212 }
213
214 pub async fn default_projection(
216 &self,
217 physical_region_id: RegionId,
218 logical_region_id: RegionId,
219 ) -> Result<Vec<usize>> {
220 let logical_columns = self
221 .load_logical_column_names(physical_region_id, logical_region_id)
222 .await?;
223 let mut projection = Vec::with_capacity(logical_columns.len());
224 let data_region_id = utils::to_data_region_id(physical_region_id);
225 let physical_metadata = self
226 .mito
227 .get_metadata(data_region_id)
228 .await
229 .context(MitoReadOperationSnafu)?;
230 for name in logical_columns {
231 projection.push(physical_metadata.column_index_by_name(&name).unwrap());
233 }
234
235 Ok(projection)
236 }
237
238 pub async fn logical_region_metadata(
239 &self,
240 physical_region_id: RegionId,
241 logical_region_id: RegionId,
242 ) -> Result<RegionMetadataRef> {
243 let logical_columns = self
244 .load_logical_columns(physical_region_id, logical_region_id)
245 .await?;
246
247 let primary_keys = logical_columns
248 .iter()
249 .filter_map(|col| {
250 if col.semantic_type == SemanticType::Tag {
251 Some(col.column_id)
252 } else {
253 None
254 }
255 })
256 .collect::<Vec<_>>();
257
258 let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
259 for col in logical_columns {
260 logical_metadata_builder.push_column_metadata(col);
261 }
262 logical_metadata_builder.primary_key(primary_keys);
263 let logical_metadata = logical_metadata_builder
264 .build()
265 .context(InvalidMetadataSnafu)?;
266
267 Ok(Arc::new(logical_metadata))
268 }
269}
270
271#[cfg(test)]
272impl MetricEngineInner {
273 pub async fn scan_to_stream(
274 &self,
275 region_id: RegionId,
276 request: ScanRequest,
277 ) -> Result<common_recordbatch::SendableRecordBatchStream, common_error::ext::BoxedError> {
278 let is_reading_physical_region = self.is_physical_region(region_id);
279
280 if is_reading_physical_region {
281 self.mito
282 .scan_to_stream(region_id, request)
283 .await
284 .map_err(common_error::ext::BoxedError::new)
285 } else {
286 let physical_region_id = self
287 .get_physical_region_id(region_id)
288 .await
289 .map_err(common_error::ext::BoxedError::new)?;
290 let request = self
291 .transform_request(physical_region_id, region_id, request)
292 .await
293 .map_err(common_error::ext::BoxedError::new)?;
294 self.mito
295 .scan_to_stream(physical_region_id, request)
296 .await
297 .map_err(common_error::ext::BoxedError::new)
298 }
299 }
300}
301
302#[cfg(test)]
303mod test {
304 use store_api::region_request::RegionRequest;
305
306 use super::*;
307 use crate::test_util::{
308 TestEnv, alter_logical_region_add_tag_columns, create_logical_region_request,
309 };
310
311 #[tokio::test]
312 async fn test_transform_scan_req() {
313 let env = TestEnv::new().await;
314 env.init_metric_region().await;
315
316 let logical_region_id = env.default_logical_region_id();
317 let physical_region_id = env.default_physical_region_id();
318
319 let logical_region_id2 = RegionId::new(1112345678, 999);
321 let create_request =
322 create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla");
323 env.metric()
324 .handle_request(logical_region_id2, RegionRequest::Create(create_request))
325 .await
326 .unwrap();
327
328 let alter_request =
330 alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
331 env.metric()
332 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
333 .await
334 .unwrap();
335
336 let projection_input = Some(vec![0, 1, 2, 3, 4, 5, 6].into());
338 let scan_req = ScanRequest {
339 projection_input,
340 filters: vec![],
341 ..Default::default()
342 };
343
344 let scan_req = env
345 .metric()
346 .inner
347 .transform_request(physical_region_id, logical_region_id, scan_req)
348 .await
349 .unwrap();
350
351 assert_eq!(
352 scan_req.projection_indices().unwrap(),
353 &[11, 10, 9, 8, 0, 1, 4]
354 );
355 assert_eq!(scan_req.filters.len(), 1);
356 assert_eq!(
357 scan_req.filters[0],
358 logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
359 .eq(logical_expr::lit(logical_region_id.table_id()))
360 );
361
362 let scan_req = ScanRequest::default();
364 let scan_req = env
365 .metric()
366 .inner
367 .transform_request(physical_region_id, logical_region_id, scan_req)
368 .await
369 .unwrap();
370 assert_eq!(
371 scan_req.projection_indices().unwrap(),
372 &[11, 10, 9, 8, 0, 1, 4]
373 );
374 }
375}