file_engine/query/
file_stream.rs1use std::sync::Arc;
16
17use common_datasource::file_format::csv::CsvFormat;
18use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
19use common_datasource::file_format::Format;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::SendableRecordBatchStream;
22use datafusion::common::ToDFSchema;
23use datafusion::datasource::listing::PartitionedFile;
24use datafusion::datasource::object_store::ObjectStoreUrl;
25use datafusion::datasource::physical_plan::{
26 CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, ParquetSource,
27};
28use datafusion::datasource::source::DataSourceExec;
29use datafusion::physical_expr::create_physical_expr;
30use datafusion::physical_expr::execution_props::ExecutionProps;
31use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::prelude::SessionContext;
34use datafusion_expr::expr::Expr;
35use datafusion_expr::utils::conjunction;
36use datafusion_orc::OrcSource;
37use datatypes::arrow::datatypes::Schema as ArrowSchema;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46fn build_record_batch_stream(
47 scan_plan_config: &ScanPlanConfig,
48 file_schema: Arc<ArrowSchema>,
49 limit: Option<usize>,
50 file_source: Arc<dyn FileSource>,
51) -> Result<SendableRecordBatchStream> {
52 let files = scan_plan_config
53 .files
54 .iter()
55 .map(|filename| PartitionedFile::new(filename.to_string(), 0))
56 .collect::<Vec<_>>();
57
58 let config = FileScanConfigBuilder::new(
59 ObjectStoreUrl::local_filesystem(),
60 file_schema,
61 file_source.clone(),
62 )
63 .with_projection(scan_plan_config.projection.cloned())
64 .with_limit(limit)
65 .with_file_group(FileGroup::new(files))
66 .build();
67
68 let store = Arc::new(object_store_opendal::OpendalStore::new(
69 scan_plan_config.store.clone(),
70 ));
71
72 let file_opener = file_source.create_file_opener(store, &config, 0);
73 let stream = FileStream::new(
74 &config,
75 0, file_opener,
77 &ExecutionPlanMetricsSet::new(),
78 )
79 .context(error::BuildStreamSnafu)?;
80 let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
81 .context(error::BuildStreamAdapterSnafu)?;
82 Ok(Box::pin(adapter))
83}
84
85fn new_csv_stream(
86 config: &ScanPlanConfig,
87 format: &CsvFormat,
88) -> Result<SendableRecordBatchStream> {
89 let file_schema = config.file_schema.arrow_schema().clone();
90
91 let limit = config.filters.is_empty().then_some(config.limit).flatten();
93
94 let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
95 .with_schema(file_schema.clone())
96 .with_batch_size(DEFAULT_BATCH_SIZE);
97
98 build_record_batch_stream(config, file_schema, limit, csv_source)
99}
100
101fn new_json_stream(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
102 let file_schema = config.file_schema.arrow_schema().clone();
103
104 let limit = config.filters.is_empty().then_some(config.limit).flatten();
106
107 let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
108 build_record_batch_stream(config, file_schema, limit, file_source)
109}
110
111fn new_parquet_stream_with_exec_plan(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
112 let file_schema = config.file_schema.arrow_schema().clone();
113 let ScanPlanConfig {
114 files,
115 projection,
116 limit,
117 filters,
118 store,
119 ..
120 } = config;
121
122 let file_group = FileGroup::new(
123 files
124 .iter()
125 .map(|filename| PartitionedFile::new(filename.to_string(), 0))
126 .collect::<Vec<_>>(),
127 );
128
129 let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
130 DefaultParquetFileReaderFactory::new(store.clone()),
131 ));
132
133 let filters = filters.to_vec();
135 if let Some(expr) = conjunction(filters) {
136 let df_schema = file_schema
137 .clone()
138 .to_dfschema_ref()
139 .context(error::ParquetScanPlanSnafu)?;
140
141 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
142 .context(error::ParquetScanPlanSnafu)?;
143 parquet_source = parquet_source.with_predicate(filters);
144 };
145
146 let file_scan_config = FileScanConfigBuilder::new(
147 ObjectStoreUrl::local_filesystem(),
148 file_schema,
149 Arc::new(parquet_source),
150 )
151 .with_file_group(file_group)
152 .with_projection(projection.cloned())
153 .with_limit(*limit)
154 .build();
155
156 let task_ctx = SessionContext::default().task_ctx();
158
159 let parquet_exec = DataSourceExec::from_data_source(file_scan_config);
160 let stream = parquet_exec
161 .execute(0, task_ctx)
162 .context(error::ParquetScanPlanSnafu)?;
163
164 Ok(Box::pin(
165 RecordBatchStreamAdapter::try_new(stream).context(error::BuildStreamAdapterSnafu)?,
166 ))
167}
168
169fn new_orc_stream(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
170 let file_schema = config.file_schema.arrow_schema().clone();
171
172 let limit = config.filters.is_empty().then_some(config.limit).flatten();
174
175 let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
176 build_record_batch_stream(config, file_schema, limit, file_source)
177}
178
179#[derive(Debug, Clone)]
180pub struct ScanPlanConfig<'a> {
181 pub file_schema: SchemaRef,
182 pub files: &'a Vec<String>,
183 pub projection: Option<&'a Vec<usize>>,
184 pub filters: &'a [Expr],
185 pub limit: Option<usize>,
186 pub store: ObjectStore,
187}
188
189pub fn create_stream(
190 format: &Format,
191 config: &ScanPlanConfig,
192) -> Result<SendableRecordBatchStream> {
193 match format {
194 Format::Csv(format) => new_csv_stream(config, format),
195 Format::Json(_) => new_json_stream(config),
196 Format::Parquet(_) => new_parquet_stream_with_exec_plan(config),
197 Format::Orc(_) => new_orc_stream(config),
198 }
199}