metric_engine/engine/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_telemetry::{error, info, tracing};
19use datafusion::logical_expr::{self, Expr};
20use snafu::{OptionExt, ResultExt};
21use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::region_engine::{RegionEngine, RegionScannerRef};
24use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
25
26use crate::engine::MetricEngineInner;
27use crate::error::{
28    InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
29};
30use crate::metrics::MITO_OPERATION_ELAPSED;
31use crate::utils;
32
33impl MetricEngineInner {
34    #[tracing::instrument(skip_all)]
35    pub async fn read_region(
36        &self,
37        region_id: RegionId,
38        request: ScanRequest,
39    ) -> Result<RegionScannerRef> {
40        let is_reading_physical_region = self.is_physical_region(region_id);
41
42        if is_reading_physical_region {
43            info!(
44                "Metric region received read request {request:?} on physical region {region_id:?}"
45            );
46            self.read_physical_region(region_id, request).await
47        } else {
48            self.read_logical_region(region_id, request).await
49        }
50    }
51
52    /// Proxy the read request to underlying physical region (mito engine).
53    async fn read_physical_region(
54        &self,
55        region_id: RegionId,
56        request: ScanRequest,
57    ) -> Result<RegionScannerRef> {
58        let _timer = MITO_OPERATION_ELAPSED
59            .with_label_values(&["read_physical"])
60            .start_timer();
61
62        self.mito
63            .handle_query(region_id, request)
64            .await
65            .context(MitoReadOperationSnafu)
66    }
67
68    async fn read_logical_region(
69        &self,
70        logical_region_id: RegionId,
71        request: ScanRequest,
72    ) -> Result<RegionScannerRef> {
73        let _timer = MITO_OPERATION_ELAPSED
74            .with_label_values(&["read"])
75            .start_timer();
76
77        let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
78        let data_region_id = utils::to_data_region_id(physical_region_id);
79        let request = self
80            .transform_request(physical_region_id, logical_region_id, request)
81            .await?;
82        let mut scanner = self
83            .mito
84            .handle_query(data_region_id, request)
85            .await
86            .context(MitoReadOperationSnafu)?;
87        scanner.set_logical_region(true);
88
89        Ok(scanner)
90    }
91
92    pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
93        let region_id = if self.is_physical_region(region_id) {
94            region_id
95        } else {
96            let physical_region_id = self.get_physical_region_id(region_id).await?;
97            utils::to_data_region_id(physical_region_id)
98        };
99        self.mito
100            .get_last_seq_num(region_id)
101            .await
102            .context(MitoReadOperationSnafu)
103    }
104
105    pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
106        let is_reading_physical_region =
107            self.state.read().unwrap().exist_physical_region(region_id);
108
109        if is_reading_physical_region {
110            self.mito
111                .get_metadata(region_id)
112                .await
113                .context(MitoReadOperationSnafu)
114        } else {
115            let physical_region_id = self.get_physical_region_id(region_id).await?;
116            self.logical_region_metadata(physical_region_id, region_id)
117                .await
118        }
119    }
120
121    /// Returns true if it's a physical region.
122    pub fn is_physical_region(&self, region_id: RegionId) -> bool {
123        self.state.read().unwrap().exist_physical_region(region_id)
124    }
125
126    async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
127        let state = &self.state.read().unwrap();
128        state
129            .get_physical_region_id(logical_region_id)
130            .with_context(|| {
131                error!("Trying to read an nonexistent region {logical_region_id}");
132                LogicalRegionNotFoundSnafu {
133                    region_id: logical_region_id,
134                }
135            })
136    }
137
138    /// Transform the [ScanRequest] from logical region to physical data region.
139    async fn transform_request(
140        &self,
141        physical_region_id: RegionId,
142        logical_region_id: RegionId,
143        mut request: ScanRequest,
144    ) -> Result<ScanRequest> {
145        // transform projection
146        let physical_projection = if let Some(projection) = &request.projection {
147            self.transform_projection(physical_region_id, logical_region_id, projection)
148                .await?
149        } else {
150            self.default_projection(physical_region_id, logical_region_id)
151                .await?
152        };
153
154        request.projection = Some(physical_projection);
155
156        // add table filter
157        request
158            .filters
159            .push(self.table_id_filter(logical_region_id));
160
161        Ok(request)
162    }
163
164    /// Generate a filter on the table id column.
165    fn table_id_filter(&self, logical_region_id: RegionId) -> Expr {
166        logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
167            .eq(logical_expr::lit(logical_region_id.table_id()))
168    }
169
170    /// Transform the projection from logical region to physical region.
171    ///
172    /// This method will not preserve internal columns.
173    pub async fn transform_projection(
174        &self,
175        physical_region_id: RegionId,
176        logical_region_id: RegionId,
177        origin_projection: &[usize],
178    ) -> Result<Vec<usize>> {
179        // project on logical columns
180        let all_logical_columns = self
181            .load_logical_column_names(physical_region_id, logical_region_id)
182            .await?;
183        let projected_logical_names = origin_projection
184            .iter()
185            .map(|i| all_logical_columns[*i].clone())
186            .collect::<Vec<_>>();
187
188        // generate physical projection
189        let mut physical_projection = Vec::with_capacity(origin_projection.len());
190        let data_region_id = utils::to_data_region_id(physical_region_id);
191        let physical_metadata = self
192            .mito
193            .get_metadata(data_region_id)
194            .await
195            .context(MitoReadOperationSnafu)?;
196
197        for name in projected_logical_names {
198            // Safety: logical columns is a strict subset of physical columns
199            physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
200        }
201
202        Ok(physical_projection)
203    }
204
205    /// Default projection for a logical region. Includes non-internal columns
206    pub async fn default_projection(
207        &self,
208        physical_region_id: RegionId,
209        logical_region_id: RegionId,
210    ) -> Result<Vec<usize>> {
211        let logical_columns = self
212            .load_logical_column_names(physical_region_id, logical_region_id)
213            .await?;
214        let mut projection = Vec::with_capacity(logical_columns.len());
215        let data_region_id = utils::to_data_region_id(physical_region_id);
216        let physical_metadata = self
217            .mito
218            .get_metadata(data_region_id)
219            .await
220            .context(MitoReadOperationSnafu)?;
221        for name in logical_columns {
222            // Safety: logical columns is a strict subset of physical columns
223            projection.push(physical_metadata.column_index_by_name(&name).unwrap());
224        }
225
226        Ok(projection)
227    }
228
229    pub async fn logical_region_metadata(
230        &self,
231        physical_region_id: RegionId,
232        logical_region_id: RegionId,
233    ) -> Result<RegionMetadataRef> {
234        let logical_columns = self
235            .load_logical_columns(physical_region_id, logical_region_id)
236            .await?;
237
238        let primary_keys = logical_columns
239            .iter()
240            .filter_map(|col| {
241                if col.semantic_type == SemanticType::Tag {
242                    Some(col.column_id)
243                } else {
244                    None
245                }
246            })
247            .collect::<Vec<_>>();
248
249        let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
250        for col in logical_columns {
251            logical_metadata_builder.push_column_metadata(col);
252        }
253        logical_metadata_builder.primary_key(primary_keys);
254        let logical_metadata = logical_metadata_builder
255            .build()
256            .context(InvalidMetadataSnafu)?;
257
258        Ok(Arc::new(logical_metadata))
259    }
260}
261
262#[cfg(test)]
263impl MetricEngineInner {
264    pub async fn scan_to_stream(
265        &self,
266        region_id: RegionId,
267        request: ScanRequest,
268    ) -> Result<common_recordbatch::SendableRecordBatchStream, common_error::ext::BoxedError> {
269        let is_reading_physical_region = self.is_physical_region(region_id);
270
271        if is_reading_physical_region {
272            self.mito
273                .scan_to_stream(region_id, request)
274                .await
275                .map_err(common_error::ext::BoxedError::new)
276        } else {
277            let physical_region_id = self
278                .get_physical_region_id(region_id)
279                .await
280                .map_err(common_error::ext::BoxedError::new)?;
281            let request = self
282                .transform_request(physical_region_id, region_id, request)
283                .await
284                .map_err(common_error::ext::BoxedError::new)?;
285            self.mito
286                .scan_to_stream(physical_region_id, request)
287                .await
288                .map_err(common_error::ext::BoxedError::new)
289        }
290    }
291}
292
293#[cfg(test)]
294mod test {
295    use store_api::region_request::RegionRequest;
296
297    use super::*;
298    use crate::test_util::{
299        alter_logical_region_add_tag_columns, create_logical_region_request, TestEnv,
300    };
301
302    #[tokio::test]
303    async fn test_transform_scan_req() {
304        let env = TestEnv::new().await;
305        env.init_metric_region().await;
306
307        let logical_region_id = env.default_logical_region_id();
308        let physical_region_id = env.default_physical_region_id();
309
310        // create another logical region
311        let logical_region_id2 = RegionId::new(1112345678, 999);
312        let create_request =
313            create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla");
314        env.metric()
315            .handle_request(logical_region_id2, RegionRequest::Create(create_request))
316            .await
317            .unwrap();
318
319        // add columns to the first logical region
320        let alter_request =
321            alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
322        env.metric()
323            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
324            .await
325            .unwrap();
326
327        // check explicit projection
328        let scan_req = ScanRequest {
329            projection: Some(vec![0, 1, 2, 3, 4, 5, 6]),
330            filters: vec![],
331            ..Default::default()
332        };
333
334        let scan_req = env
335            .metric()
336            .inner
337            .transform_request(physical_region_id, logical_region_id, scan_req)
338            .await
339            .unwrap();
340
341        assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
342        assert_eq!(scan_req.filters.len(), 1);
343        assert_eq!(
344            scan_req.filters[0],
345            logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
346                .eq(logical_expr::lit(logical_region_id.table_id()))
347        );
348
349        // check default projection
350        let scan_req = ScanRequest::default();
351        let scan_req = env
352            .metric()
353            .inner
354            .transform_request(physical_region_id, logical_region_id, scan_req)
355            .await
356            .unwrap();
357        assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
358    }
359}