file_engine/
query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod file_stream;
16
17use std::collections::HashSet;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use common_datasource::object_store::build_backend;
23use common_recordbatch::adapter::RecordBatchMetrics;
24use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult};
25use common_recordbatch::{
26    DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
27    SendableRecordBatchStream,
28};
29use datafusion::logical_expr::utils as df_logical_expr_utils;
30use datafusion_expr::expr::Expr;
31use datatypes::arrow::compute as arrow_compute;
32use datatypes::data_type::DataType;
33use datatypes::schema::{Schema, SchemaRef};
34use datatypes::vectors::Helper;
35use futures::Stream;
36use snafu::{GenerateImplicitData, ResultExt, ensure};
37use store_api::storage::ScanRequest;
38
39use self::file_stream::ScanPlanConfig;
40use crate::error::{
41    BuildBackendSnafu, ExtractColumnFromFilterSnafu, ProjectSchemaSnafu,
42    ProjectionOutOfBoundsSnafu, Result,
43};
44use crate::region::FileRegion;
45
46impl FileRegion {
47    pub fn query(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
48        let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?;
49
50        let file_projection = self.projection_pushdown_to_file(&request.projection)?;
51        let file_filters = self.filters_pushdown_to_file(&request.filters)?;
52        let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone()));
53
54        let projected_file_schema = if let Some(projection) = &file_projection {
55            Arc::new(
56                file_schema
57                    .try_project(projection)
58                    .context(ProjectSchemaSnafu)?,
59            )
60        } else {
61            file_schema.clone()
62        };
63
64        let file_stream = file_stream::create_stream(
65            &self.format,
66            &ScanPlanConfig {
67                file_schema,
68                files: &self.file_options.files,
69                projection: file_projection.as_ref(),
70                filters: &file_filters,
71                limit: request.limit,
72                store,
73            },
74        )?;
75
76        let scan_schema = self.scan_schema(&request.projection)?;
77
78        Ok(Box::pin(FileToScanRegionStream::new(
79            scan_schema,
80            projected_file_schema,
81            file_stream,
82        )))
83    }
84
85    fn projection_pushdown_to_file(
86        &self,
87        req_projection: &Option<Vec<usize>>,
88    ) -> Result<Option<Vec<usize>>> {
89        let Some(scan_projection) = req_projection.as_ref() else {
90            return Ok(None);
91        };
92
93        let file_column_schemas = &self.file_options.file_column_schemas;
94        let mut file_projection = Vec::with_capacity(scan_projection.len());
95        for column_index in scan_projection {
96            ensure!(
97                *column_index < self.metadata.schema.num_columns(),
98                ProjectionOutOfBoundsSnafu {
99                    column_index: *column_index,
100                    bounds: self.metadata.schema.num_columns()
101                }
102            );
103
104            let column_name = self.metadata.schema.column_name_by_index(*column_index);
105            let file_column_index = file_column_schemas
106                .iter()
107                .position(|c| c.name == column_name);
108            if let Some(file_column_index) = file_column_index {
109                file_projection.push(file_column_index);
110            }
111        }
112        Ok(Some(file_projection))
113    }
114
115    // Collects filters that can be pushed down to the file, specifically filters where Expr
116    // only contains columns from the file.
117    fn filters_pushdown_to_file(&self, scan_filters: &[Expr]) -> Result<Vec<Expr>> {
118        let mut file_filters = Vec::with_capacity(scan_filters.len());
119
120        let file_column_names = self
121            .file_options
122            .file_column_schemas
123            .iter()
124            .map(|c| &c.name)
125            .collect::<HashSet<_>>();
126
127        let mut aux_column_set = HashSet::new();
128        for scan_filter in scan_filters {
129            df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)
130                .context(ExtractColumnFromFilterSnafu)?;
131
132            let all_file_columns = aux_column_set
133                .iter()
134                .all(|column_in_expr| file_column_names.contains(&column_in_expr.name));
135            if all_file_columns {
136                file_filters.push(scan_filter.clone());
137            }
138            aux_column_set.clear();
139        }
140        Ok(file_filters)
141    }
142
143    fn scan_schema(&self, req_projection: &Option<Vec<usize>>) -> Result<SchemaRef> {
144        let schema = if let Some(indices) = req_projection {
145            Arc::new(
146                self.metadata
147                    .schema
148                    .try_project(indices)
149                    .context(ProjectSchemaSnafu)?,
150            )
151        } else {
152            self.metadata.schema.clone()
153        };
154
155        Ok(schema)
156    }
157}
158
159struct FileToScanRegionStream {
160    scan_schema: SchemaRef,
161    file_stream: DfSendableRecordBatchStream,
162    /// Maps columns in `scan_schema` to their index in the projected file schema.
163    /// `None` means the column doesn't exist in the file and should be filled with default values.
164    scan_to_file_projection: Vec<Option<usize>>,
165}
166
167impl RecordBatchStream for FileToScanRegionStream {
168    fn schema(&self) -> SchemaRef {
169        self.scan_schema.clone()
170    }
171
172    fn output_ordering(&self) -> Option<&[OrderOption]> {
173        None
174    }
175
176    fn metrics(&self) -> Option<RecordBatchMetrics> {
177        None
178    }
179}
180
181impl Stream for FileToScanRegionStream {
182    type Item = RecordBatchResult<RecordBatch>;
183
184    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        match Pin::new(&mut self.file_stream).poll_next(ctx) {
186            Poll::Pending => Poll::Pending,
187            Poll::Ready(Some(Ok(file_record_batch))) => {
188                let num_rows = file_record_batch.num_rows();
189                let mut columns = Vec::with_capacity(self.scan_schema.num_columns());
190
191                for (idx, column_schema) in self.scan_schema.column_schemas().iter().enumerate() {
192                    if let Some(file_idx) = self.scan_to_file_projection[idx] {
193                        let expected_arrow_type = column_schema.data_type.as_arrow_type();
194                        let mut array = file_record_batch.column(file_idx).clone();
195
196                        if array.data_type() != &expected_arrow_type {
197                            array = arrow_compute::cast(array.as_ref(), &expected_arrow_type)
198                                .context(recordbatch_error::ArrowComputeSnafu)?;
199                        }
200
201                        let vector = Helper::try_into_vector(array)
202                            .context(recordbatch_error::DataTypesSnafu)?;
203                        columns.push(vector);
204                    } else {
205                        let vector = column_schema
206                            .create_default_vector(num_rows)
207                            .context(recordbatch_error::DataTypesSnafu)?
208                            .ok_or_else(|| {
209                                recordbatch_error::CreateRecordBatchesSnafu {
210                                    reason: format!(
211                                        "column {} is missing from file source and has no default",
212                                        column_schema.name
213                                    ),
214                                }
215                                .build()
216                            })?;
217                        columns.push(vector);
218                    }
219                }
220
221                let record_batch = RecordBatch::new(self.scan_schema.clone(), columns)?;
222
223                Poll::Ready(Some(Ok(record_batch)))
224            }
225            Poll::Ready(Some(Err(error))) => {
226                Poll::Ready(Some(Err(recordbatch_error::Error::PollStream {
227                    error,
228                    location: snafu::Location::generate(),
229                })))
230            }
231            Poll::Ready(None) => Poll::Ready(None),
232        }
233    }
234}
235
236impl FileToScanRegionStream {
237    fn new(
238        scan_schema: SchemaRef,
239        file_schema: SchemaRef,
240        file_stream: DfSendableRecordBatchStream,
241    ) -> Self {
242        let scan_to_file_projection = scan_schema
243            .column_schemas()
244            .iter()
245            .map(|column| file_schema.column_index_by_name(&column.name))
246            .collect();
247
248        Self {
249            scan_schema,
250            file_stream,
251            scan_to_file_projection,
252        }
253    }
254}