1use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_telemetry::{error, info, tracing};
19use datafusion::logical_expr::{self, Expr};
20use snafu::{OptionExt, ResultExt};
21use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::region_engine::{RegionEngine, RegionScannerRef};
24use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
25
26use crate::engine::MetricEngineInner;
27use crate::error::{
28 InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
29};
30use crate::metrics::MITO_OPERATION_ELAPSED;
31use crate::utils;
32
33impl MetricEngineInner {
34 #[tracing::instrument(skip_all)]
35 pub async fn read_region(
36 &self,
37 region_id: RegionId,
38 request: ScanRequest,
39 ) -> Result<RegionScannerRef> {
40 let is_reading_physical_region = self.is_physical_region(region_id);
41
42 if is_reading_physical_region {
43 info!(
44 "Metric region received read request {request:?} on physical region {region_id:?}"
45 );
46 self.read_physical_region(region_id, request).await
47 } else {
48 self.read_logical_region(region_id, request).await
49 }
50 }
51
52 async fn read_physical_region(
54 &self,
55 region_id: RegionId,
56 request: ScanRequest,
57 ) -> Result<RegionScannerRef> {
58 let _timer = MITO_OPERATION_ELAPSED
59 .with_label_values(&["read_physical"])
60 .start_timer();
61
62 self.mito
63 .handle_query(region_id, request)
64 .await
65 .context(MitoReadOperationSnafu)
66 }
67
68 async fn read_logical_region(
69 &self,
70 logical_region_id: RegionId,
71 request: ScanRequest,
72 ) -> Result<RegionScannerRef> {
73 let _timer = MITO_OPERATION_ELAPSED
74 .with_label_values(&["read"])
75 .start_timer();
76
77 let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
78 let data_region_id = utils::to_data_region_id(physical_region_id);
79 let request = self
80 .transform_request(physical_region_id, logical_region_id, request)
81 .await?;
82 let mut scanner = self
83 .mito
84 .handle_query(data_region_id, request)
85 .await
86 .context(MitoReadOperationSnafu)?;
87 scanner.set_logical_region(true);
88
89 Ok(scanner)
90 }
91
92 pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
93 let region_id = if self.is_physical_region(region_id) {
94 region_id
95 } else {
96 let physical_region_id = self.get_physical_region_id(region_id).await?;
97 utils::to_data_region_id(physical_region_id)
98 };
99 self.mito
100 .get_last_seq_num(region_id)
101 .await
102 .context(MitoReadOperationSnafu)
103 }
104
105 pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
106 let is_reading_physical_region =
107 self.state.read().unwrap().exist_physical_region(region_id);
108
109 if is_reading_physical_region {
110 self.mito
111 .get_metadata(region_id)
112 .await
113 .context(MitoReadOperationSnafu)
114 } else {
115 let physical_region_id = self.get_physical_region_id(region_id).await?;
116 self.logical_region_metadata(physical_region_id, region_id)
117 .await
118 }
119 }
120
121 pub fn is_physical_region(&self, region_id: RegionId) -> bool {
123 self.state.read().unwrap().exist_physical_region(region_id)
124 }
125
126 async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
127 let state = &self.state.read().unwrap();
128 state
129 .get_physical_region_id(logical_region_id)
130 .with_context(|| {
131 error!("Trying to read an nonexistent region {logical_region_id}");
132 LogicalRegionNotFoundSnafu {
133 region_id: logical_region_id,
134 }
135 })
136 }
137
138 async fn transform_request(
140 &self,
141 physical_region_id: RegionId,
142 logical_region_id: RegionId,
143 mut request: ScanRequest,
144 ) -> Result<ScanRequest> {
145 let physical_projection = if let Some(projection) = &request.projection {
147 self.transform_projection(physical_region_id, logical_region_id, projection)
148 .await?
149 } else {
150 self.default_projection(physical_region_id, logical_region_id)
151 .await?
152 };
153
154 request.projection = Some(physical_projection);
155
156 request
158 .filters
159 .push(self.table_id_filter(logical_region_id));
160
161 Ok(request)
162 }
163
164 fn table_id_filter(&self, logical_region_id: RegionId) -> Expr {
166 logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
167 .eq(logical_expr::lit(logical_region_id.table_id()))
168 }
169
170 pub async fn transform_projection(
174 &self,
175 physical_region_id: RegionId,
176 logical_region_id: RegionId,
177 origin_projection: &[usize],
178 ) -> Result<Vec<usize>> {
179 let all_logical_columns = self
181 .load_logical_column_names(physical_region_id, logical_region_id)
182 .await?;
183 let projected_logical_names = origin_projection
184 .iter()
185 .map(|i| all_logical_columns[*i].clone())
186 .collect::<Vec<_>>();
187
188 let mut physical_projection = Vec::with_capacity(origin_projection.len());
190 let data_region_id = utils::to_data_region_id(physical_region_id);
191 let physical_metadata = self
192 .mito
193 .get_metadata(data_region_id)
194 .await
195 .context(MitoReadOperationSnafu)?;
196
197 for name in projected_logical_names {
198 physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
200 }
201
202 Ok(physical_projection)
203 }
204
205 pub async fn default_projection(
207 &self,
208 physical_region_id: RegionId,
209 logical_region_id: RegionId,
210 ) -> Result<Vec<usize>> {
211 let logical_columns = self
212 .load_logical_column_names(physical_region_id, logical_region_id)
213 .await?;
214 let mut projection = Vec::with_capacity(logical_columns.len());
215 let data_region_id = utils::to_data_region_id(physical_region_id);
216 let physical_metadata = self
217 .mito
218 .get_metadata(data_region_id)
219 .await
220 .context(MitoReadOperationSnafu)?;
221 for name in logical_columns {
222 projection.push(physical_metadata.column_index_by_name(&name).unwrap());
224 }
225
226 Ok(projection)
227 }
228
229 pub async fn logical_region_metadata(
230 &self,
231 physical_region_id: RegionId,
232 logical_region_id: RegionId,
233 ) -> Result<RegionMetadataRef> {
234 let logical_columns = self
235 .load_logical_columns(physical_region_id, logical_region_id)
236 .await?;
237
238 let primary_keys = logical_columns
239 .iter()
240 .filter_map(|col| {
241 if col.semantic_type == SemanticType::Tag {
242 Some(col.column_id)
243 } else {
244 None
245 }
246 })
247 .collect::<Vec<_>>();
248
249 let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
250 for col in logical_columns {
251 logical_metadata_builder.push_column_metadata(col);
252 }
253 logical_metadata_builder.primary_key(primary_keys);
254 let logical_metadata = logical_metadata_builder
255 .build()
256 .context(InvalidMetadataSnafu)?;
257
258 Ok(Arc::new(logical_metadata))
259 }
260}
261
262#[cfg(test)]
263impl MetricEngineInner {
264 pub async fn scan_to_stream(
265 &self,
266 region_id: RegionId,
267 request: ScanRequest,
268 ) -> Result<common_recordbatch::SendableRecordBatchStream, common_error::ext::BoxedError> {
269 let is_reading_physical_region = self.is_physical_region(region_id);
270
271 if is_reading_physical_region {
272 self.mito
273 .scan_to_stream(region_id, request)
274 .await
275 .map_err(common_error::ext::BoxedError::new)
276 } else {
277 let physical_region_id = self
278 .get_physical_region_id(region_id)
279 .await
280 .map_err(common_error::ext::BoxedError::new)?;
281 let request = self
282 .transform_request(physical_region_id, region_id, request)
283 .await
284 .map_err(common_error::ext::BoxedError::new)?;
285 self.mito
286 .scan_to_stream(physical_region_id, request)
287 .await
288 .map_err(common_error::ext::BoxedError::new)
289 }
290 }
291}
292
293#[cfg(test)]
294mod test {
295 use store_api::region_request::RegionRequest;
296
297 use super::*;
298 use crate::test_util::{
299 alter_logical_region_add_tag_columns, create_logical_region_request, TestEnv,
300 };
301
302 #[tokio::test]
303 async fn test_transform_scan_req() {
304 let env = TestEnv::new().await;
305 env.init_metric_region().await;
306
307 let logical_region_id = env.default_logical_region_id();
308 let physical_region_id = env.default_physical_region_id();
309
310 let logical_region_id2 = RegionId::new(1112345678, 999);
312 let create_request =
313 create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla");
314 env.metric()
315 .handle_request(logical_region_id2, RegionRequest::Create(create_request))
316 .await
317 .unwrap();
318
319 let alter_request =
321 alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
322 env.metric()
323 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
324 .await
325 .unwrap();
326
327 let scan_req = ScanRequest {
329 projection: Some(vec![0, 1, 2, 3, 4, 5, 6]),
330 filters: vec![],
331 ..Default::default()
332 };
333
334 let scan_req = env
335 .metric()
336 .inner
337 .transform_request(physical_region_id, logical_region_id, scan_req)
338 .await
339 .unwrap();
340
341 assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
342 assert_eq!(scan_req.filters.len(), 1);
343 assert_eq!(
344 scan_req.filters[0],
345 logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
346 .eq(logical_expr::lit(logical_region_id.table_id()))
347 );
348
349 let scan_req = ScanRequest::default();
351 let scan_req = env
352 .metric()
353 .inner
354 .transform_request(physical_region_id, logical_region_id, scan_req)
355 .await
356 .unwrap();
357 assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
358 }
359}