Skip to main content

metric_engine/engine/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::SemanticType;
18use common_telemetry::{debug, error, tracing};
19use datafusion::logical_expr::{self, Expr};
20use snafu::{OptionExt, ResultExt};
21use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
22use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
23use store_api::region_engine::{RegionEngine, RegionScannerRef};
24use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
25
26use crate::engine::MetricEngineInner;
27use crate::error::{
28    InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
29};
30use crate::metrics::MITO_OPERATION_ELAPSED;
31use crate::utils;
32
33impl MetricEngineInner {
34    #[tracing::instrument(skip_all)]
35    pub async fn read_region(
36        &self,
37        region_id: RegionId,
38        request: ScanRequest,
39    ) -> Result<RegionScannerRef> {
40        let is_reading_physical_region = self.is_physical_region(region_id);
41
42        if is_reading_physical_region {
43            debug!(
44                "Metric region received read request {request:?} on physical region {region_id:?}"
45            );
46            self.read_physical_region(region_id, request).await
47        } else {
48            self.read_logical_region(region_id, request).await
49        }
50    }
51
52    /// Proxy the read request to underlying physical region (mito engine).
53    async fn read_physical_region(
54        &self,
55        region_id: RegionId,
56        request: ScanRequest,
57    ) -> Result<RegionScannerRef> {
58        let _timer = MITO_OPERATION_ELAPSED
59            .with_label_values(&["read_physical"])
60            .start_timer();
61
62        self.mito
63            .handle_query(region_id, request)
64            .await
65            .context(MitoReadOperationSnafu)
66    }
67
68    async fn read_logical_region(
69        &self,
70        logical_region_id: RegionId,
71        request: ScanRequest,
72    ) -> Result<RegionScannerRef> {
73        let _timer = MITO_OPERATION_ELAPSED
74            .with_label_values(&["read"])
75            .start_timer();
76
77        let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
78        let data_region_id = utils::to_data_region_id(physical_region_id);
79        let request = self
80            .transform_request(physical_region_id, logical_region_id, request)
81            .await?;
82        let mut scanner = self
83            .mito
84            .handle_query(data_region_id, request)
85            .await
86            .context(MitoReadOperationSnafu)?;
87        scanner.set_logical_region(true);
88
89        Ok(scanner)
90    }
91
92    pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<SequenceNumber> {
93        let region_id = if self.is_physical_region(region_id) {
94            region_id
95        } else {
96            let physical_region_id = self.get_physical_region_id(region_id).await?;
97            utils::to_data_region_id(physical_region_id)
98        };
99        self.mito
100            .get_committed_sequence(region_id)
101            .await
102            .context(MitoReadOperationSnafu)
103    }
104
105    pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
106        let is_reading_physical_region =
107            self.state.read().unwrap().exist_physical_region(region_id);
108
109        if is_reading_physical_region {
110            self.mito
111                .get_metadata(region_id)
112                .await
113                .context(MitoReadOperationSnafu)
114        } else {
115            let physical_region_id = self.get_physical_region_id(region_id).await?;
116            self.logical_region_metadata(physical_region_id, region_id)
117                .await
118        }
119    }
120
121    /// Returns true if it's a physical region.
122    pub fn is_physical_region(&self, region_id: RegionId) -> bool {
123        self.state.read().unwrap().exist_physical_region(region_id)
124    }
125
126    async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
127        let state = &self.state.read().unwrap();
128        state
129            .get_physical_region_id(logical_region_id)
130            .with_context(|| {
131                error!("Trying to read an nonexistent region {logical_region_id}");
132                LogicalRegionNotFoundSnafu {
133                    region_id: logical_region_id,
134                }
135            })
136    }
137
138    /// Transform the [ScanRequest] from logical region to physical data region.
139    async fn transform_request(
140        &self,
141        physical_region_id: RegionId,
142        logical_region_id: RegionId,
143        mut request: ScanRequest,
144    ) -> Result<ScanRequest> {
145        // transform projection
146        let physical_projection = match request.projection_input.as_ref() {
147            Some(projection_input) => {
148                self.transform_projection(
149                    physical_region_id,
150                    logical_region_id,
151                    &projection_input.projection,
152                )
153                .await?
154            }
155            None => {
156                self.default_projection(physical_region_id, logical_region_id)
157                    .await?
158            }
159        };
160
161        // Rewrite the top-level projection from logical-region schema indices to
162        // physical-region schema indices. `nested_paths` are left unchanged because
163        // they are expressed by column name rather than schema index.
164        request.projection_input.get_or_insert_default().projection = physical_projection;
165
166        request
167            .filters
168            .push(self.table_id_filter(logical_region_id));
169
170        Ok(request)
171    }
172
173    /// Generate a filter on the table id column.
174    fn table_id_filter(&self, logical_region_id: RegionId) -> Expr {
175        logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
176            .eq(logical_expr::lit(logical_region_id.table_id()))
177    }
178
179    /// Transform the projection from logical region to physical region.
180    ///
181    /// This method will not preserve internal columns.
182    pub async fn transform_projection(
183        &self,
184        physical_region_id: RegionId,
185        logical_region_id: RegionId,
186        origin_projection: &[usize],
187    ) -> Result<Vec<usize>> {
188        // project on logical columns
189        let all_logical_columns = self
190            .load_logical_column_names(physical_region_id, logical_region_id)
191            .await?;
192        let projected_logical_names = origin_projection
193            .iter()
194            .map(|i| all_logical_columns[*i].clone())
195            .collect::<Vec<_>>();
196
197        // generate physical projection
198        let mut physical_projection = Vec::with_capacity(origin_projection.len());
199        let data_region_id = utils::to_data_region_id(physical_region_id);
200        let physical_metadata = self
201            .mito
202            .get_metadata(data_region_id)
203            .await
204            .context(MitoReadOperationSnafu)?;
205
206        for name in projected_logical_names {
207            // Safety: logical columns is a strict subset of physical columns
208            physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
209        }
210
211        Ok(physical_projection)
212    }
213
214    /// Default projection for a logical region. Includes non-internal columns
215    pub async fn default_projection(
216        &self,
217        physical_region_id: RegionId,
218        logical_region_id: RegionId,
219    ) -> Result<Vec<usize>> {
220        let logical_columns = self
221            .load_logical_column_names(physical_region_id, logical_region_id)
222            .await?;
223        let mut projection = Vec::with_capacity(logical_columns.len());
224        let data_region_id = utils::to_data_region_id(physical_region_id);
225        let physical_metadata = self
226            .mito
227            .get_metadata(data_region_id)
228            .await
229            .context(MitoReadOperationSnafu)?;
230        for name in logical_columns {
231            // Safety: logical columns is a strict subset of physical columns
232            projection.push(physical_metadata.column_index_by_name(&name).unwrap());
233        }
234
235        Ok(projection)
236    }
237
238    pub async fn logical_region_metadata(
239        &self,
240        physical_region_id: RegionId,
241        logical_region_id: RegionId,
242    ) -> Result<RegionMetadataRef> {
243        let logical_columns = self
244            .load_logical_columns(physical_region_id, logical_region_id)
245            .await?;
246
247        let primary_keys = logical_columns
248            .iter()
249            .filter_map(|col| {
250                if col.semantic_type == SemanticType::Tag {
251                    Some(col.column_id)
252                } else {
253                    None
254                }
255            })
256            .collect::<Vec<_>>();
257
258        let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
259        for col in logical_columns {
260            logical_metadata_builder.push_column_metadata(col);
261        }
262        logical_metadata_builder.primary_key(primary_keys);
263        let logical_metadata = logical_metadata_builder
264            .build()
265            .context(InvalidMetadataSnafu)?;
266
267        Ok(Arc::new(logical_metadata))
268    }
269}
270
271#[cfg(test)]
272impl MetricEngineInner {
273    pub async fn scan_to_stream(
274        &self,
275        region_id: RegionId,
276        request: ScanRequest,
277    ) -> Result<common_recordbatch::SendableRecordBatchStream, common_error::ext::BoxedError> {
278        let is_reading_physical_region = self.is_physical_region(region_id);
279
280        if is_reading_physical_region {
281            self.mito
282                .scan_to_stream(region_id, request)
283                .await
284                .map_err(common_error::ext::BoxedError::new)
285        } else {
286            let physical_region_id = self
287                .get_physical_region_id(region_id)
288                .await
289                .map_err(common_error::ext::BoxedError::new)?;
290            let request = self
291                .transform_request(physical_region_id, region_id, request)
292                .await
293                .map_err(common_error::ext::BoxedError::new)?;
294            self.mito
295                .scan_to_stream(physical_region_id, request)
296                .await
297                .map_err(common_error::ext::BoxedError::new)
298        }
299    }
300}
301
302#[cfg(test)]
303mod test {
304    use store_api::region_request::RegionRequest;
305
306    use super::*;
307    use crate::test_util::{
308        TestEnv, alter_logical_region_add_tag_columns, create_logical_region_request,
309    };
310
311    #[tokio::test]
312    async fn test_transform_scan_req() {
313        let env = TestEnv::new().await;
314        env.init_metric_region().await;
315
316        let logical_region_id = env.default_logical_region_id();
317        let physical_region_id = env.default_physical_region_id();
318
319        // create another logical region
320        let logical_region_id2 = RegionId::new(1112345678, 999);
321        let create_request =
322            create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla");
323        env.metric()
324            .handle_request(logical_region_id2, RegionRequest::Create(create_request))
325            .await
326            .unwrap();
327
328        // add columns to the first logical region
329        let alter_request =
330            alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
331        env.metric()
332            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
333            .await
334            .unwrap();
335
336        // check explicit projection
337        let projection_input = Some(vec![0, 1, 2, 3, 4, 5, 6].into());
338        let scan_req = ScanRequest {
339            projection_input,
340            filters: vec![],
341            ..Default::default()
342        };
343
344        let scan_req = env
345            .metric()
346            .inner
347            .transform_request(physical_region_id, logical_region_id, scan_req)
348            .await
349            .unwrap();
350
351        assert_eq!(
352            scan_req.projection_indices().unwrap(),
353            &[11, 10, 9, 8, 0, 1, 4]
354        );
355        assert_eq!(scan_req.filters.len(), 1);
356        assert_eq!(
357            scan_req.filters[0],
358            logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
359                .eq(logical_expr::lit(logical_region_id.table_id()))
360        );
361
362        // check default projection
363        let scan_req = ScanRequest::default();
364        let scan_req = env
365            .metric()
366            .inner
367            .transform_request(physical_region_id, logical_region_id, scan_req)
368            .await
369            .unwrap();
370        assert_eq!(
371            scan_req.projection_indices().unwrap(),
372            &[11, 10, 9, 8, 0, 1, 4]
373        );
374    }
375}