1pub(crate) mod file_stream;
16
17use std::collections::HashSet;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use common_datasource::object_store::build_backend;
23use common_error::ext::BoxedError;
24use common_recordbatch::adapter::RecordBatchMetrics;
25use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult};
26use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
27use datafusion::logical_expr::utils as df_logical_expr_utils;
28use datafusion_expr::expr::Expr;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::vectors::VectorRef;
32use futures::Stream;
33use snafu::{ensure, OptionExt, ResultExt};
34use store_api::storage::ScanRequest;
35
36use self::file_stream::{CreateScanPlanContext, ScanPlanConfig};
37use crate::error::{
38 BuildBackendSnafu, CreateDefaultSnafu, ExtractColumnFromFilterSnafu,
39 MissingColumnNoDefaultSnafu, ProjectSchemaSnafu, ProjectionOutOfBoundsSnafu, Result,
40};
41use crate::region::FileRegion;
42
43impl FileRegion {
44 pub fn query(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
45 let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?;
46
47 let file_projection = self.projection_pushdown_to_file(&request.projection)?;
48 let file_filters = self.filters_pushdown_to_file(&request.filters)?;
49 let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone()));
50
51 let file_stream = file_stream::create_stream(
52 &self.format,
53 &CreateScanPlanContext::default(),
54 &ScanPlanConfig {
55 file_schema,
56 files: &self.file_options.files,
57 projection: file_projection.as_ref(),
58 filters: &file_filters,
59 limit: request.limit,
60 store,
61 },
62 )?;
63
64 let scan_schema = self.scan_schema(&request.projection)?;
65
66 Ok(Box::pin(FileToScanRegionStream::new(
67 scan_schema,
68 file_stream,
69 )))
70 }
71
72 fn projection_pushdown_to_file(
73 &self,
74 req_projection: &Option<Vec<usize>>,
75 ) -> Result<Option<Vec<usize>>> {
76 let Some(scan_projection) = req_projection.as_ref() else {
77 return Ok(None);
78 };
79
80 let file_column_schemas = &self.file_options.file_column_schemas;
81 let mut file_projection = Vec::with_capacity(scan_projection.len());
82 for column_index in scan_projection {
83 ensure!(
84 *column_index < self.metadata.schema.num_columns(),
85 ProjectionOutOfBoundsSnafu {
86 column_index: *column_index,
87 bounds: self.metadata.schema.num_columns()
88 }
89 );
90
91 let column_name = self.metadata.schema.column_name_by_index(*column_index);
92 let file_column_index = file_column_schemas
93 .iter()
94 .position(|c| c.name == column_name);
95 if let Some(file_column_index) = file_column_index {
96 file_projection.push(file_column_index);
97 }
98 }
99 Ok(Some(file_projection))
100 }
101
102 fn filters_pushdown_to_file(&self, scan_filters: &[Expr]) -> Result<Vec<Expr>> {
105 let mut file_filters = Vec::with_capacity(scan_filters.len());
106
107 let file_column_names = self
108 .file_options
109 .file_column_schemas
110 .iter()
111 .map(|c| &c.name)
112 .collect::<HashSet<_>>();
113
114 let mut aux_column_set = HashSet::new();
115 for scan_filter in scan_filters {
116 df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)
117 .context(ExtractColumnFromFilterSnafu)?;
118
119 let all_file_columns = aux_column_set
120 .iter()
121 .all(|column_in_expr| file_column_names.contains(&column_in_expr.name));
122 if all_file_columns {
123 file_filters.push(scan_filter.clone());
124 }
125 aux_column_set.clear();
126 }
127 Ok(file_filters)
128 }
129
130 fn scan_schema(&self, req_projection: &Option<Vec<usize>>) -> Result<SchemaRef> {
131 let schema = if let Some(indices) = req_projection {
132 Arc::new(
133 self.metadata
134 .schema
135 .try_project(indices)
136 .context(ProjectSchemaSnafu)?,
137 )
138 } else {
139 self.metadata.schema.clone()
140 };
141
142 Ok(schema)
143 }
144}
145
146struct FileToScanRegionStream {
147 scan_schema: SchemaRef,
148 file_stream: SendableRecordBatchStream,
149}
150
151impl RecordBatchStream for FileToScanRegionStream {
152 fn schema(&self) -> SchemaRef {
153 self.scan_schema.clone()
154 }
155
156 fn output_ordering(&self) -> Option<&[OrderOption]> {
157 None
158 }
159
160 fn metrics(&self) -> Option<RecordBatchMetrics> {
161 None
162 }
163}
164
165impl Stream for FileToScanRegionStream {
166 type Item = RecordBatchResult<RecordBatch>;
167
168 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 match Pin::new(&mut self.file_stream).poll_next(ctx) {
170 Poll::Pending => Poll::Pending,
171 Poll::Ready(Some(file_record_batch)) => {
172 let file_record_batch = file_record_batch?;
173 let scan_record_batch = if self.schema_eq(&file_record_batch) {
174 Ok(file_record_batch)
175 } else {
176 self.convert_record_batch(&file_record_batch)
177 };
178
179 Poll::Ready(Some(scan_record_batch))
180 }
181 Poll::Ready(None) => Poll::Ready(None),
182 }
183 }
184}
185
186impl FileToScanRegionStream {
187 fn new(scan_schema: SchemaRef, file_stream: SendableRecordBatchStream) -> Self {
188 Self {
189 scan_schema,
190 file_stream,
191 }
192 }
193
194 fn schema_eq(&self, file_record_batch: &RecordBatch) -> bool {
195 self.scan_schema
196 .column_schemas()
197 .iter()
198 .all(|scan_column_schema| {
199 file_record_batch
200 .column_by_name(&scan_column_schema.name)
201 .map(|rb| rb.data_type() == scan_column_schema.data_type)
202 .unwrap_or_default()
203 })
204 }
205
206 fn convert_record_batch(
213 &self,
214 file_record_batch: &RecordBatch,
215 ) -> RecordBatchResult<RecordBatch> {
216 let file_row_count = file_record_batch.num_rows();
217 let columns = self
218 .scan_schema
219 .column_schemas()
220 .iter()
221 .map(|scan_column_schema| {
222 let file_column = file_record_batch.column_by_name(&scan_column_schema.name);
223 if let Some(file_column) = file_column {
224 Self::cast_column_type(file_column, &scan_column_schema.data_type)
225 } else {
226 Self::backfill_column(scan_column_schema, file_row_count)
227 }
228 })
229 .collect::<RecordBatchResult<Vec<_>>>()?;
230
231 RecordBatch::new(self.scan_schema.clone(), columns)
232 }
233
234 fn cast_column_type(
235 source_column: &VectorRef,
236 target_data_type: &ConcreteDataType,
237 ) -> RecordBatchResult<VectorRef> {
238 if &source_column.data_type() == target_data_type {
239 Ok(source_column.clone())
240 } else {
241 source_column
242 .cast(target_data_type)
243 .context(CastVectorSnafu {
244 from_type: source_column.data_type(),
245 to_type: target_data_type.clone(),
246 })
247 }
248 }
249
250 fn backfill_column(
251 column_schema: &ColumnSchema,
252 num_rows: usize,
253 ) -> RecordBatchResult<VectorRef> {
254 Self::create_default_vector(column_schema, num_rows)
255 .map_err(BoxedError::new)
256 .context(ExternalSnafu)
257 }
258
259 fn create_default_vector(column_schema: &ColumnSchema, num_rows: usize) -> Result<VectorRef> {
260 column_schema
261 .create_default_vector(num_rows)
262 .with_context(|_| CreateDefaultSnafu {
263 column: column_schema.name.clone(),
264 })?
265 .with_context(|| MissingColumnNoDefaultSnafu {
266 column: column_schema.name.clone(),
267 })
268 }
269}