file_engine/query/
file_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::file_format::Format;
18use common_datasource::file_format::csv::CsvFormat;
19use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
20use datafusion::common::ToDFSchema;
21use datafusion::config::CsvOptions;
22use datafusion::datasource::listing::PartitionedFile;
23use datafusion::datasource::object_store::ObjectStoreUrl;
24use datafusion::datasource::physical_plan::{
25    CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, ParquetSource,
26};
27use datafusion::datasource::source::DataSourceExec;
28use datafusion::physical_expr::create_physical_expr;
29use datafusion::physical_expr::execution_props::ExecutionProps;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion::physical_plan::{
32    ExecutionPlan, SendableRecordBatchStream as DfSendableRecordBatchStream,
33};
34use datafusion::prelude::SessionContext;
35use datafusion_expr::expr::Expr;
36use datafusion_expr::utils::conjunction;
37use datafusion_orc::OrcSource;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46fn build_record_batch_stream(
47    scan_plan_config: &ScanPlanConfig,
48    limit: Option<usize>,
49    file_source: Arc<dyn FileSource>,
50) -> Result<DfSendableRecordBatchStream> {
51    let files = scan_plan_config
52        .files
53        .iter()
54        .map(|filename| PartitionedFile::new(filename.clone(), 0))
55        .collect::<Vec<_>>();
56
57    let config =
58        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
59            .with_projection_indices(scan_plan_config.projection.cloned())
60            .with_limit(limit)
61            .with_file_group(FileGroup::new(files))
62            .build();
63
64    let store = Arc::new(object_store_opendal::OpendalStore::new(
65        scan_plan_config.store.clone(),
66    ));
67
68    let file_opener = file_source.create_file_opener(store, &config, 0);
69    let stream = FileStream::new(
70        &config,
71        0, // partition: hard-code
72        file_opener,
73        &ExecutionPlanMetricsSet::new(),
74    )
75    .context(error::BuildStreamSnafu)?;
76    Ok(Box::pin(stream))
77}
78
79fn new_csv_stream(
80    config: &ScanPlanConfig,
81    format: &CsvFormat,
82) -> Result<DfSendableRecordBatchStream> {
83    let file_schema = config.file_schema.arrow_schema().clone();
84
85    // push down limit only if there is no filter
86    let limit = config.filters.is_empty().then_some(config.limit).flatten();
87
88    let options = CsvOptions::default()
89        .with_has_header(format.has_header)
90        .with_delimiter(format.delimiter);
91    let csv_source = CsvSource::new(file_schema)
92        .with_csv_options(options)
93        .with_batch_size(DEFAULT_BATCH_SIZE);
94
95    build_record_batch_stream(config, limit, csv_source)
96}
97
98fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
99    let file_schema = config.file_schema.arrow_schema().clone();
100
101    // push down limit only if there is no filter
102    let limit = config.filters.is_empty().then_some(config.limit).flatten();
103
104    let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
105    build_record_batch_stream(config, limit, file_source)
106}
107
108fn new_parquet_stream_with_exec_plan(
109    config: &ScanPlanConfig,
110) -> Result<DfSendableRecordBatchStream> {
111    let file_schema = config.file_schema.arrow_schema().clone();
112    let ScanPlanConfig {
113        files,
114        projection,
115        limit,
116        filters,
117        store,
118        ..
119    } = config;
120
121    let file_group = FileGroup::new(
122        files
123            .iter()
124            .map(|filename| PartitionedFile::new(filename.clone(), 0))
125            .collect::<Vec<_>>(),
126    );
127
128    let mut parquet_source = ParquetSource::new(file_schema.clone())
129        .with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
130            store.clone(),
131        )));
132
133    // build predicate filter
134    let filters = filters.to_vec();
135    if let Some(expr) = conjunction(filters) {
136        let df_schema = file_schema
137            .clone()
138            .to_dfschema_ref()
139            .context(error::ParquetScanPlanSnafu)?;
140
141        let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
142            .context(error::ParquetScanPlanSnafu)?;
143        parquet_source = parquet_source.with_predicate(filters);
144    };
145
146    let file_scan_config =
147        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
148            .with_file_group(file_group)
149            .with_projection_indices(projection.cloned())
150            .with_limit(*limit)
151            .build();
152
153    // TODO(ruihang): get this from upper layer
154    let task_ctx = SessionContext::default().task_ctx();
155
156    let parquet_exec = DataSourceExec::from_data_source(file_scan_config);
157    let stream = parquet_exec
158        .execute(0, task_ctx)
159        .context(error::ParquetScanPlanSnafu)?;
160
161    Ok(stream)
162}
163
164fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
165    let file_schema = config.file_schema.arrow_schema().clone();
166
167    // push down limit only if there is no filter
168    let limit = config.filters.is_empty().then_some(config.limit).flatten();
169
170    let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
171    build_record_batch_stream(config, limit, file_source)
172}
173
174#[derive(Debug, Clone)]
175pub struct ScanPlanConfig<'a> {
176    pub file_schema: SchemaRef,
177    pub files: &'a Vec<String>,
178    pub projection: Option<&'a Vec<usize>>,
179    pub filters: &'a [Expr],
180    pub limit: Option<usize>,
181    pub store: ObjectStore,
182}
183
184pub fn create_stream(
185    format: &Format,
186    config: &ScanPlanConfig,
187) -> Result<DfSendableRecordBatchStream> {
188    match format {
189        Format::Csv(format) => new_csv_stream(config, format),
190        Format::Json(_) => new_json_stream(config),
191        Format::Parquet(_) => new_parquet_stream_with_exec_plan(config),
192        Format::Orc(_) => new_orc_stream(config),
193    }
194}