1pub(crate) mod file_stream;
16
17use std::collections::HashSet;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use common_datasource::object_store::build_backend;
23use common_recordbatch::adapter::RecordBatchMetrics;
24use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult};
25use common_recordbatch::{
26 DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
27 SendableRecordBatchStream,
28};
29use datafusion::logical_expr::utils as df_logical_expr_utils;
30use datafusion_expr::expr::Expr;
31use datatypes::arrow::compute as arrow_compute;
32use datatypes::data_type::DataType;
33use datatypes::schema::{Schema, SchemaRef};
34use datatypes::vectors::Helper;
35use futures::Stream;
36use snafu::{GenerateImplicitData, ResultExt, ensure};
37use store_api::storage::ScanRequest;
38
39use self::file_stream::ScanPlanConfig;
40use crate::error::{
41 BuildBackendSnafu, ExtractColumnFromFilterSnafu, ProjectSchemaSnafu,
42 ProjectionOutOfBoundsSnafu, Result,
43};
44use crate::region::FileRegion;
45
46impl FileRegion {
47 pub fn query(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
48 let store = build_backend(&self.url, &self.options).context(BuildBackendSnafu)?;
49
50 let file_projection = self.projection_pushdown_to_file(&request.projection)?;
51 let file_filters = self.filters_pushdown_to_file(&request.filters)?;
52 let file_schema = Arc::new(Schema::new(self.file_options.file_column_schemas.clone()));
53
54 let projected_file_schema = if let Some(projection) = &file_projection {
55 Arc::new(
56 file_schema
57 .try_project(projection)
58 .context(ProjectSchemaSnafu)?,
59 )
60 } else {
61 file_schema.clone()
62 };
63
64 let file_stream = file_stream::create_stream(
65 &self.format,
66 &ScanPlanConfig {
67 file_schema,
68 files: &self.file_options.files,
69 projection: file_projection.as_ref(),
70 filters: &file_filters,
71 limit: request.limit,
72 store,
73 },
74 )?;
75
76 let scan_schema = self.scan_schema(&request.projection)?;
77
78 Ok(Box::pin(FileToScanRegionStream::new(
79 scan_schema,
80 projected_file_schema,
81 file_stream,
82 )))
83 }
84
85 fn projection_pushdown_to_file(
86 &self,
87 req_projection: &Option<Vec<usize>>,
88 ) -> Result<Option<Vec<usize>>> {
89 let Some(scan_projection) = req_projection.as_ref() else {
90 return Ok(None);
91 };
92
93 let file_column_schemas = &self.file_options.file_column_schemas;
94 let mut file_projection = Vec::with_capacity(scan_projection.len());
95 for column_index in scan_projection {
96 ensure!(
97 *column_index < self.metadata.schema.num_columns(),
98 ProjectionOutOfBoundsSnafu {
99 column_index: *column_index,
100 bounds: self.metadata.schema.num_columns()
101 }
102 );
103
104 let column_name = self.metadata.schema.column_name_by_index(*column_index);
105 let file_column_index = file_column_schemas
106 .iter()
107 .position(|c| c.name == column_name);
108 if let Some(file_column_index) = file_column_index {
109 file_projection.push(file_column_index);
110 }
111 }
112 Ok(Some(file_projection))
113 }
114
115 fn filters_pushdown_to_file(&self, scan_filters: &[Expr]) -> Result<Vec<Expr>> {
118 let mut file_filters = Vec::with_capacity(scan_filters.len());
119
120 let file_column_names = self
121 .file_options
122 .file_column_schemas
123 .iter()
124 .map(|c| &c.name)
125 .collect::<HashSet<_>>();
126
127 let mut aux_column_set = HashSet::new();
128 for scan_filter in scan_filters {
129 df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)
130 .context(ExtractColumnFromFilterSnafu)?;
131
132 let all_file_columns = aux_column_set
133 .iter()
134 .all(|column_in_expr| file_column_names.contains(&column_in_expr.name));
135 if all_file_columns {
136 file_filters.push(scan_filter.clone());
137 }
138 aux_column_set.clear();
139 }
140 Ok(file_filters)
141 }
142
143 fn scan_schema(&self, req_projection: &Option<Vec<usize>>) -> Result<SchemaRef> {
144 let schema = if let Some(indices) = req_projection {
145 Arc::new(
146 self.metadata
147 .schema
148 .try_project(indices)
149 .context(ProjectSchemaSnafu)?,
150 )
151 } else {
152 self.metadata.schema.clone()
153 };
154
155 Ok(schema)
156 }
157}
158
159struct FileToScanRegionStream {
160 scan_schema: SchemaRef,
161 file_stream: DfSendableRecordBatchStream,
162 scan_to_file_projection: Vec<Option<usize>>,
165}
166
167impl RecordBatchStream for FileToScanRegionStream {
168 fn schema(&self) -> SchemaRef {
169 self.scan_schema.clone()
170 }
171
172 fn output_ordering(&self) -> Option<&[OrderOption]> {
173 None
174 }
175
176 fn metrics(&self) -> Option<RecordBatchMetrics> {
177 None
178 }
179}
180
181impl Stream for FileToScanRegionStream {
182 type Item = RecordBatchResult<RecordBatch>;
183
184 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 match Pin::new(&mut self.file_stream).poll_next(ctx) {
186 Poll::Pending => Poll::Pending,
187 Poll::Ready(Some(Ok(file_record_batch))) => {
188 let num_rows = file_record_batch.num_rows();
189 let mut columns = Vec::with_capacity(self.scan_schema.num_columns());
190
191 for (idx, column_schema) in self.scan_schema.column_schemas().iter().enumerate() {
192 if let Some(file_idx) = self.scan_to_file_projection[idx] {
193 let expected_arrow_type = column_schema.data_type.as_arrow_type();
194 let mut array = file_record_batch.column(file_idx).clone();
195
196 if array.data_type() != &expected_arrow_type {
197 array = arrow_compute::cast(array.as_ref(), &expected_arrow_type)
198 .context(recordbatch_error::ArrowComputeSnafu)?;
199 }
200
201 let vector = Helper::try_into_vector(array)
202 .context(recordbatch_error::DataTypesSnafu)?;
203 columns.push(vector);
204 } else {
205 let vector = column_schema
206 .create_default_vector(num_rows)
207 .context(recordbatch_error::DataTypesSnafu)?
208 .ok_or_else(|| {
209 recordbatch_error::CreateRecordBatchesSnafu {
210 reason: format!(
211 "column {} is missing from file source and has no default",
212 column_schema.name
213 ),
214 }
215 .build()
216 })?;
217 columns.push(vector);
218 }
219 }
220
221 let record_batch = RecordBatch::new(self.scan_schema.clone(), columns)?;
222
223 Poll::Ready(Some(Ok(record_batch)))
224 }
225 Poll::Ready(Some(Err(error))) => {
226 Poll::Ready(Some(Err(recordbatch_error::Error::PollStream {
227 error,
228 location: snafu::Location::generate(),
229 })))
230 }
231 Poll::Ready(None) => Poll::Ready(None),
232 }
233 }
234}
235
236impl FileToScanRegionStream {
237 fn new(
238 scan_schema: SchemaRef,
239 file_schema: SchemaRef,
240 file_stream: DfSendableRecordBatchStream,
241 ) -> Self {
242 let scan_to_file_projection = scan_schema
243 .column_schemas()
244 .iter()
245 .map(|column| file_schema.column_index_by_name(&column.name))
246 .collect();
247
248 Self {
249 scan_schema,
250 file_stream,
251 scan_to_file_projection,
252 }
253 }
254}