1use std::sync::Arc;
16
17use common_datasource::file_format::csv::CsvFormat;
18use common_datasource::file_format::json::JsonFormat;
19use common_datasource::file_format::orc::{OrcFormat, OrcOpener};
20use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
21use common_datasource::file_format::Format;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::SendableRecordBatchStream;
24use datafusion::common::{Constraints, Statistics, ToDFSchema};
25use datafusion::datasource::listing::PartitionedFile;
26use datafusion::datasource::object_store::ObjectStoreUrl;
27use datafusion::datasource::physical_plan::{
28 CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener, ParquetExec,
29};
30use datafusion::physical_expr::create_physical_expr;
31use datafusion::physical_expr::execution_props::ExecutionProps;
32use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
33use datafusion::physical_plan::ExecutionPlan;
34use datafusion::prelude::SessionContext;
35use datafusion_expr::expr::Expr;
36use datafusion_expr::utils::conjunction;
37use datatypes::arrow::datatypes::Schema as ArrowSchema;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46#[derive(Debug, Clone, Copy, Default)]
47pub struct CreateScanPlanContext {}
48
49fn build_csv_opener(
50 file_schema: Arc<ArrowSchema>,
51 config: &ScanPlanConfig,
52 format: &CsvFormat,
53) -> CsvOpener {
54 let csv_config = Arc::new(CsvConfig::new(
55 DEFAULT_BATCH_SIZE,
56 file_schema,
57 config.projection.cloned(),
58 format.has_header,
59 format.delimiter,
60 b'"',
61 None,
62 Arc::new(object_store_opendal::OpendalStore::new(
63 config.store.clone(),
64 )),
65 None,
66 ));
67 CsvOpener::new(csv_config, format.compression_type.into())
68}
69
70fn build_json_opener(
71 file_schema: Arc<ArrowSchema>,
72 config: &ScanPlanConfig,
73 format: &JsonFormat,
74) -> Result<JsonOpener> {
75 let projected_schema = if let Some(projection) = config.projection {
76 Arc::new(
77 file_schema
78 .project(projection)
79 .context(error::ProjectArrowSchemaSnafu)?,
80 )
81 } else {
82 file_schema
83 };
84 let store = object_store_opendal::OpendalStore::new(config.store.clone());
85 Ok(JsonOpener::new(
86 DEFAULT_BATCH_SIZE,
87 projected_schema,
88 format.compression_type.into(),
89 Arc::new(store),
90 ))
91}
92
93fn build_orc_opener(output_schema: Arc<ArrowSchema>, config: &ScanPlanConfig) -> Result<OrcOpener> {
94 Ok(OrcOpener::new(
95 config.store.clone(),
96 output_schema,
97 config.projection.cloned(),
98 ))
99}
100
101fn build_record_batch_stream<T: FileOpener + Send + 'static>(
102 opener: T,
103 file_schema: Arc<ArrowSchema>,
104 files: &[String],
105 projection: Option<&Vec<usize>>,
106 limit: Option<usize>,
107) -> Result<SendableRecordBatchStream> {
108 let statistics = Statistics::new_unknown(file_schema.as_ref());
109 let stream = FileStream::new(
110 &FileScanConfig {
111 object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), file_schema,
113 file_groups: vec![files
114 .iter()
115 .map(|filename| PartitionedFile::new(filename.to_string(), 0))
116 .collect::<Vec<_>>()],
117 statistics,
118 projection: projection.cloned(),
119 limit,
120 table_partition_cols: vec![],
121 output_ordering: vec![],
122 constraints: Constraints::empty(),
123 },
124 0, opener,
126 &ExecutionPlanMetricsSet::new(),
127 )
128 .context(error::BuildStreamSnafu)?;
129 let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
130 .context(error::BuildStreamAdapterSnafu)?;
131 Ok(Box::pin(adapter))
132}
133
134fn new_csv_stream(
135 _ctx: &CreateScanPlanContext,
136 config: &ScanPlanConfig,
137 format: &CsvFormat,
138) -> Result<SendableRecordBatchStream> {
139 let file_schema = config.file_schema.arrow_schema().clone();
140 let opener = build_csv_opener(file_schema.clone(), config, format);
141 let limit = config.filters.is_empty().then_some(config.limit).flatten();
143 build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
144}
145
146fn new_json_stream(
147 _ctx: &CreateScanPlanContext,
148 config: &ScanPlanConfig,
149 format: &JsonFormat,
150) -> Result<SendableRecordBatchStream> {
151 let file_schema = config.file_schema.arrow_schema().clone();
152 let opener = build_json_opener(file_schema.clone(), config, format)?;
153 let limit = config.filters.is_empty().then_some(config.limit).flatten();
155 build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
156}
157
158fn new_parquet_stream_with_exec_plan(
159 _ctx: &CreateScanPlanContext,
160 config: &ScanPlanConfig,
161 _format: &ParquetFormat,
162) -> Result<SendableRecordBatchStream> {
163 let file_schema = config.file_schema.arrow_schema().clone();
164 let ScanPlanConfig {
165 files,
166 projection,
167 limit,
168 filters,
169 store,
170 ..
171 } = config;
172
173 let scan_config = FileScanConfig {
175 object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), file_schema: file_schema.clone(),
177 file_groups: vec![files
178 .iter()
179 .map(|filename| PartitionedFile::new(filename.to_string(), 0))
180 .collect::<Vec<_>>()],
181 constraints: Constraints::empty(),
182 statistics: Statistics::new_unknown(file_schema.as_ref()),
183 projection: projection.cloned(),
184 limit: *limit,
185 table_partition_cols: vec![],
186 output_ordering: vec![],
187 };
188
189 let filters = filters.to_vec();
191 let filters = if let Some(expr) = conjunction(filters) {
192 let df_schema = file_schema
193 .clone()
194 .to_dfschema_ref()
195 .context(error::ParquetScanPlanSnafu)?;
196
197 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
198 .context(error::ParquetScanPlanSnafu)?;
199 Some(filters)
200 } else {
201 None
202 };
203
204 let task_ctx = SessionContext::default().task_ctx();
206 let mut builder = ParquetExec::builder(scan_config);
207 if let Some(filters) = filters {
208 builder = builder.with_predicate(filters);
209 }
210 let parquet_exec = builder
211 .with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
212 store.clone(),
213 )))
214 .build();
215 let stream = parquet_exec
216 .execute(0, task_ctx)
217 .context(error::ParquetScanPlanSnafu)?;
218
219 Ok(Box::pin(
220 RecordBatchStreamAdapter::try_new(stream).context(error::BuildStreamAdapterSnafu)?,
221 ))
222}
223
224fn new_orc_stream(
225 _ctx: &CreateScanPlanContext,
226 config: &ScanPlanConfig,
227 _format: &OrcFormat,
228) -> Result<SendableRecordBatchStream> {
229 let file_schema = config.file_schema.arrow_schema().clone();
230 let opener = build_orc_opener(file_schema.clone(), config)?;
231 let limit = config.filters.is_empty().then_some(config.limit).flatten();
233 build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
234}
235
236#[derive(Debug, Clone)]
237pub struct ScanPlanConfig<'a> {
238 pub file_schema: SchemaRef,
239 pub files: &'a Vec<String>,
240 pub projection: Option<&'a Vec<usize>>,
241 pub filters: &'a [Expr],
242 pub limit: Option<usize>,
243 pub store: ObjectStore,
244}
245
246pub fn create_stream(
247 format: &Format,
248 ctx: &CreateScanPlanContext,
249 config: &ScanPlanConfig,
250) -> Result<SendableRecordBatchStream> {
251 match format {
252 Format::Csv(format) => new_csv_stream(ctx, config, format),
253 Format::Json(format) => new_json_stream(ctx, config, format),
254 Format::Parquet(format) => new_parquet_stream_with_exec_plan(ctx, config, format),
255 Format::Orc(format) => new_orc_stream(ctx, config, format),
256 }
257}