file_engine/query/
file_stream.rs1use std::sync::Arc;
16
17use common_datasource::file_format::Format;
18use common_datasource::file_format::csv::CsvFormat;
19use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
20use datafusion::common::ToDFSchema;
21use datafusion::config::CsvOptions;
22use datafusion::datasource::listing::PartitionedFile;
23use datafusion::datasource::object_store::ObjectStoreUrl;
24use datafusion::datasource::physical_plan::{
25 CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, ParquetSource,
26};
27use datafusion::datasource::source::DataSourceExec;
28use datafusion::physical_expr::create_physical_expr;
29use datafusion::physical_expr::execution_props::ExecutionProps;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion::physical_plan::{
32 ExecutionPlan, SendableRecordBatchStream as DfSendableRecordBatchStream,
33};
34use datafusion::prelude::SessionContext;
35use datafusion_expr::expr::Expr;
36use datafusion_expr::utils::conjunction;
37use datafusion_orc::OrcSource;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46fn build_record_batch_stream(
47 scan_plan_config: &ScanPlanConfig,
48 limit: Option<usize>,
49 file_source: Arc<dyn FileSource>,
50) -> Result<DfSendableRecordBatchStream> {
51 let files = scan_plan_config
52 .files
53 .iter()
54 .map(|filename| PartitionedFile::new(filename.clone(), 0))
55 .collect::<Vec<_>>();
56
57 let config =
58 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
59 .with_projection_indices(scan_plan_config.projection.cloned())
60 .with_limit(limit)
61 .with_file_group(FileGroup::new(files))
62 .build();
63
64 let store = Arc::new(object_store_opendal::OpendalStore::new(
65 scan_plan_config.store.clone(),
66 ));
67
68 let file_opener = file_source.create_file_opener(store, &config, 0);
69 let stream = FileStream::new(
70 &config,
71 0, file_opener,
73 &ExecutionPlanMetricsSet::new(),
74 )
75 .context(error::BuildStreamSnafu)?;
76 Ok(Box::pin(stream))
77}
78
79fn new_csv_stream(
80 config: &ScanPlanConfig,
81 format: &CsvFormat,
82) -> Result<DfSendableRecordBatchStream> {
83 let file_schema = config.file_schema.arrow_schema().clone();
84
85 let limit = config.filters.is_empty().then_some(config.limit).flatten();
87
88 let options = CsvOptions::default()
89 .with_has_header(format.has_header)
90 .with_delimiter(format.delimiter);
91 let csv_source = CsvSource::new(file_schema)
92 .with_csv_options(options)
93 .with_batch_size(DEFAULT_BATCH_SIZE);
94
95 build_record_batch_stream(config, limit, csv_source)
96}
97
98fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
99 let file_schema = config.file_schema.arrow_schema().clone();
100
101 let limit = config.filters.is_empty().then_some(config.limit).flatten();
103
104 let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
105 build_record_batch_stream(config, limit, file_source)
106}
107
108fn new_parquet_stream_with_exec_plan(
109 config: &ScanPlanConfig,
110) -> Result<DfSendableRecordBatchStream> {
111 let file_schema = config.file_schema.arrow_schema().clone();
112 let ScanPlanConfig {
113 files,
114 projection,
115 limit,
116 filters,
117 store,
118 ..
119 } = config;
120
121 let file_group = FileGroup::new(
122 files
123 .iter()
124 .map(|filename| PartitionedFile::new(filename.clone(), 0))
125 .collect::<Vec<_>>(),
126 );
127
128 let mut parquet_source = ParquetSource::new(file_schema.clone())
129 .with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
130 store.clone(),
131 )));
132
133 let filters = filters.to_vec();
135 if let Some(expr) = conjunction(filters) {
136 let df_schema = file_schema
137 .clone()
138 .to_dfschema_ref()
139 .context(error::ParquetScanPlanSnafu)?;
140
141 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
142 .context(error::ParquetScanPlanSnafu)?;
143 parquet_source = parquet_source.with_predicate(filters);
144 };
145
146 let file_scan_config =
147 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
148 .with_file_group(file_group)
149 .with_projection_indices(projection.cloned())
150 .with_limit(*limit)
151 .build();
152
153 let task_ctx = SessionContext::default().task_ctx();
155
156 let parquet_exec = DataSourceExec::from_data_source(file_scan_config);
157 let stream = parquet_exec
158 .execute(0, task_ctx)
159 .context(error::ParquetScanPlanSnafu)?;
160
161 Ok(stream)
162}
163
164fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
165 let file_schema = config.file_schema.arrow_schema().clone();
166
167 let limit = config.filters.is_empty().then_some(config.limit).flatten();
169
170 let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
171 build_record_batch_stream(config, limit, file_source)
172}
173
174#[derive(Debug, Clone)]
175pub struct ScanPlanConfig<'a> {
176 pub file_schema: SchemaRef,
177 pub files: &'a Vec<String>,
178 pub projection: Option<&'a Vec<usize>>,
179 pub filters: &'a [Expr],
180 pub limit: Option<usize>,
181 pub store: ObjectStore,
182}
183
184pub fn create_stream(
185 format: &Format,
186 config: &ScanPlanConfig,
187) -> Result<DfSendableRecordBatchStream> {
188 match format {
189 Format::Csv(format) => new_csv_stream(config, format),
190 Format::Json(_) => new_json_stream(config),
191 Format::Parquet(_) => new_parquet_stream_with_exec_plan(config),
192 Format::Orc(_) => new_orc_stream(config),
193 }
194}