file_engine/
query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod file_stream;
16
17use std::collections::HashSet;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use common_datasource::object_store::build_backend;
23use common_error::ext::BoxedError;
24use common_recordbatch::adapter::RecordBatchMetrics;
25use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult};
26use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
27use datafusion::logical_expr::utils as df_logical_expr_utils;
28use datafusion_expr::expr::Expr;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::vectors::VectorRef;
32use futures::Stream;
33use snafu::{ensure, OptionExt, ResultExt};
34use store_api::storage::ScanRequest;
35
36use self::file_stream::{CreateScanPlanContext, ScanPlanConfig};
37use crate::error::{
38    BuildBackendSnafu, CreateDefaultSnafu, ExtractColumnFromFilterSnafu,
39    MissingColumnNoDefaultSnafu, ProjectSchemaSnafu, ProjectionOutOfBoundsSnafu, Result,
40};
41use crate::region::FileRegion;
42
43impl FileRegion {
44    pub fn query(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
45        let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?;
46
47        let file_projection = self.projection_pushdown_to_file(&request.projection)?;
48        let file_filters = self.filters_pushdown_to_file(&request.filters)?;
49        let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone()));
50
51        let file_stream = file_stream::create_stream(
52            &self.format,
53            &CreateScanPlanContext::default(),
54            &ScanPlanConfig {
55                file_schema,
56                files: &self.file_options.files,
57                projection: file_projection.as_ref(),
58                filters: &file_filters,
59                limit: request.limit,
60                store,
61            },
62        )?;
63
64        let scan_schema = self.scan_schema(&request.projection)?;
65
66        Ok(Box::pin(FileToScanRegionStream::new(
67            scan_schema,
68            file_stream,
69        )))
70    }
71
72    fn projection_pushdown_to_file(
73        &self,
74        req_projection: &Option<Vec<usize>>,
75    ) -> Result<Option<Vec<usize>>> {
76        let Some(scan_projection) = req_projection.as_ref() else {
77            return Ok(None);
78        };
79
80        let file_column_schemas = &self.file_options.file_column_schemas;
81        let mut file_projection = Vec::with_capacity(scan_projection.len());
82        for column_index in scan_projection {
83            ensure!(
84                *column_index < self.metadata.schema.num_columns(),
85                ProjectionOutOfBoundsSnafu {
86                    column_index: *column_index,
87                    bounds: self.metadata.schema.num_columns()
88                }
89            );
90
91            let column_name = self.metadata.schema.column_name_by_index(*column_index);
92            let file_column_index = file_column_schemas
93                .iter()
94                .position(|c| c.name == column_name);
95            if let Some(file_column_index) = file_column_index {
96                file_projection.push(file_column_index);
97            }
98        }
99        Ok(Some(file_projection))
100    }
101
102    // Collects filters that can be pushed down to the file, specifically filters where Expr
103    // only contains columns from the file.
104    fn filters_pushdown_to_file(&self, scan_filters: &[Expr]) -> Result<Vec<Expr>> {
105        let mut file_filters = Vec::with_capacity(scan_filters.len());
106
107        let file_column_names = self
108            .file_options
109            .file_column_schemas
110            .iter()
111            .map(|c| &c.name)
112            .collect::<HashSet<_>>();
113
114        let mut aux_column_set = HashSet::new();
115        for scan_filter in scan_filters {
116            df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)
117                .context(ExtractColumnFromFilterSnafu)?;
118
119            let all_file_columns = aux_column_set
120                .iter()
121                .all(|column_in_expr| file_column_names.contains(&column_in_expr.name));
122            if all_file_columns {
123                file_filters.push(scan_filter.clone());
124            }
125            aux_column_set.clear();
126        }
127        Ok(file_filters)
128    }
129
130    fn scan_schema(&self, req_projection: &Option<Vec<usize>>) -> Result<SchemaRef> {
131        let schema = if let Some(indices) = req_projection {
132            Arc::new(
133                self.metadata
134                    .schema
135                    .try_project(indices)
136                    .context(ProjectSchemaSnafu)?,
137            )
138        } else {
139            self.metadata.schema.clone()
140        };
141
142        Ok(schema)
143    }
144}
145
146struct FileToScanRegionStream {
147    scan_schema: SchemaRef,
148    file_stream: SendableRecordBatchStream,
149}
150
151impl RecordBatchStream for FileToScanRegionStream {
152    fn schema(&self) -> SchemaRef {
153        self.scan_schema.clone()
154    }
155
156    fn output_ordering(&self) -> Option<&[OrderOption]> {
157        None
158    }
159
160    fn metrics(&self) -> Option<RecordBatchMetrics> {
161        None
162    }
163}
164
165impl Stream for FileToScanRegionStream {
166    type Item = RecordBatchResult<RecordBatch>;
167
168    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169        match Pin::new(&mut self.file_stream).poll_next(ctx) {
170            Poll::Pending => Poll::Pending,
171            Poll::Ready(Some(file_record_batch)) => {
172                let file_record_batch = file_record_batch?;
173                let scan_record_batch = if self.schema_eq(&file_record_batch) {
174                    Ok(file_record_batch)
175                } else {
176                    self.convert_record_batch(&file_record_batch)
177                };
178
179                Poll::Ready(Some(scan_record_batch))
180            }
181            Poll::Ready(None) => Poll::Ready(None),
182        }
183    }
184}
185
186impl FileToScanRegionStream {
187    fn new(scan_schema: SchemaRef, file_stream: SendableRecordBatchStream) -> Self {
188        Self {
189            scan_schema,
190            file_stream,
191        }
192    }
193
194    fn schema_eq(&self, file_record_batch: &RecordBatch) -> bool {
195        self.scan_schema
196            .column_schemas()
197            .iter()
198            .all(|scan_column_schema| {
199                file_record_batch
200                    .column_by_name(&scan_column_schema.name)
201                    .map(|rb| rb.data_type() == scan_column_schema.data_type)
202                    .unwrap_or_default()
203            })
204    }
205
206    /// Converts a RecordBatch from file schema to scan schema.
207    ///
208    /// This function performs the following operations:
209    /// - Projection: Only columns present in scan schema are retained.
210    /// - Cast Type: Columns present in both file schema and scan schema but with different types are cast to the type in scan schema.
211    /// - Backfill: Columns present in scan schema but not in file schema are backfilled with default values.
212    fn convert_record_batch(
213        &self,
214        file_record_batch: &RecordBatch,
215    ) -> RecordBatchResult<RecordBatch> {
216        let file_row_count = file_record_batch.num_rows();
217        let columns = self
218            .scan_schema
219            .column_schemas()
220            .iter()
221            .map(|scan_column_schema| {
222                let file_column = file_record_batch.column_by_name(&scan_column_schema.name);
223                if let Some(file_column) = file_column {
224                    Self::cast_column_type(file_column, &scan_column_schema.data_type)
225                } else {
226                    Self::backfill_column(scan_column_schema, file_row_count)
227                }
228            })
229            .collect::<RecordBatchResult<Vec<_>>>()?;
230
231        RecordBatch::new(self.scan_schema.clone(), columns)
232    }
233
234    fn cast_column_type(
235        source_column: &VectorRef,
236        target_data_type: &ConcreteDataType,
237    ) -> RecordBatchResult<VectorRef> {
238        if &source_column.data_type() == target_data_type {
239            Ok(source_column.clone())
240        } else {
241            source_column
242                .cast(target_data_type)
243                .context(CastVectorSnafu {
244                    from_type: source_column.data_type(),
245                    to_type: target_data_type.clone(),
246                })
247        }
248    }
249
250    fn backfill_column(
251        column_schema: &ColumnSchema,
252        num_rows: usize,
253    ) -> RecordBatchResult<VectorRef> {
254        Self::create_default_vector(column_schema, num_rows)
255            .map_err(BoxedError::new)
256            .context(ExternalSnafu)
257    }
258
259    fn create_default_vector(column_schema: &ColumnSchema, num_rows: usize) -> Result<VectorRef> {
260        column_schema
261            .create_default_vector(num_rows)
262            .with_context(|_| CreateDefaultSnafu {
263                column: column_schema.name.clone(),
264            })?
265            .with_context(|| MissingColumnNoDefaultSnafu {
266                column: column_schema.name.clone(),
267            })
268    }
269}