file_engine/query/
file_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::file_format::Format;
18use common_datasource::file_format::csv::CsvFormat;
19use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
20use datafusion::common::ToDFSchema;
21use datafusion::datasource::listing::PartitionedFile;
22use datafusion::datasource::object_store::ObjectStoreUrl;
23use datafusion::datasource::physical_plan::{
24    CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, ParquetSource,
25};
26use datafusion::datasource::source::DataSourceExec;
27use datafusion::physical_expr::create_physical_expr;
28use datafusion::physical_expr::execution_props::ExecutionProps;
29use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
30use datafusion::physical_plan::{
31    ExecutionPlan, SendableRecordBatchStream as DfSendableRecordBatchStream,
32};
33use datafusion::prelude::SessionContext;
34use datafusion_expr::expr::Expr;
35use datafusion_expr::utils::conjunction;
36use datafusion_orc::OrcSource;
37use datatypes::arrow::datatypes::Schema as ArrowSchema;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46fn build_record_batch_stream(
47    scan_plan_config: &ScanPlanConfig,
48    file_schema: Arc<ArrowSchema>,
49    limit: Option<usize>,
50    file_source: Arc<dyn FileSource>,
51) -> Result<DfSendableRecordBatchStream> {
52    let files = scan_plan_config
53        .files
54        .iter()
55        .map(|filename| PartitionedFile::new(filename.clone(), 0))
56        .collect::<Vec<_>>();
57
58    let config = FileScanConfigBuilder::new(
59        ObjectStoreUrl::local_filesystem(),
60        file_schema,
61        file_source.clone(),
62    )
63    .with_projection(scan_plan_config.projection.cloned())
64    .with_limit(limit)
65    .with_file_group(FileGroup::new(files))
66    .build();
67
68    let store = Arc::new(object_store_opendal::OpendalStore::new(
69        scan_plan_config.store.clone(),
70    ));
71
72    let file_opener = file_source.create_file_opener(store, &config, 0);
73    let stream = FileStream::new(
74        &config,
75        0, // partition: hard-code
76        file_opener,
77        &ExecutionPlanMetricsSet::new(),
78    )
79    .context(error::BuildStreamSnafu)?;
80    Ok(Box::pin(stream))
81}
82
83fn new_csv_stream(
84    config: &ScanPlanConfig,
85    format: &CsvFormat,
86) -> Result<DfSendableRecordBatchStream> {
87    let file_schema = config.file_schema.arrow_schema().clone();
88
89    // push down limit only if there is no filter
90    let limit = config.filters.is_empty().then_some(config.limit).flatten();
91
92    let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
93        .with_schema(file_schema.clone())
94        .with_batch_size(DEFAULT_BATCH_SIZE);
95
96    build_record_batch_stream(config, file_schema, limit, csv_source)
97}
98
99fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
100    let file_schema = config.file_schema.arrow_schema().clone();
101
102    // push down limit only if there is no filter
103    let limit = config.filters.is_empty().then_some(config.limit).flatten();
104
105    let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
106    build_record_batch_stream(config, file_schema, limit, file_source)
107}
108
109fn new_parquet_stream_with_exec_plan(
110    config: &ScanPlanConfig,
111) -> Result<DfSendableRecordBatchStream> {
112    let file_schema = config.file_schema.arrow_schema().clone();
113    let ScanPlanConfig {
114        files,
115        projection,
116        limit,
117        filters,
118        store,
119        ..
120    } = config;
121
122    let file_group = FileGroup::new(
123        files
124            .iter()
125            .map(|filename| PartitionedFile::new(filename.clone(), 0))
126            .collect::<Vec<_>>(),
127    );
128
129    let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
130        DefaultParquetFileReaderFactory::new(store.clone()),
131    ));
132
133    // build predicate filter
134    let filters = filters.to_vec();
135    if let Some(expr) = conjunction(filters) {
136        let df_schema = file_schema
137            .clone()
138            .to_dfschema_ref()
139            .context(error::ParquetScanPlanSnafu)?;
140
141        let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
142            .context(error::ParquetScanPlanSnafu)?;
143        parquet_source = parquet_source.with_predicate(filters);
144    };
145
146    let file_scan_config = FileScanConfigBuilder::new(
147        ObjectStoreUrl::local_filesystem(),
148        file_schema,
149        Arc::new(parquet_source),
150    )
151    .with_file_group(file_group)
152    .with_projection(projection.cloned())
153    .with_limit(*limit)
154    .build();
155
156    // TODO(ruihang): get this from upper layer
157    let task_ctx = SessionContext::default().task_ctx();
158
159    let parquet_exec = DataSourceExec::from_data_source(file_scan_config);
160    let stream = parquet_exec
161        .execute(0, task_ctx)
162        .context(error::ParquetScanPlanSnafu)?;
163
164    Ok(stream)
165}
166
167fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
168    let file_schema = config.file_schema.arrow_schema().clone();
169
170    // push down limit only if there is no filter
171    let limit = config.filters.is_empty().then_some(config.limit).flatten();
172
173    let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
174    build_record_batch_stream(config, file_schema, limit, file_source)
175}
176
177#[derive(Debug, Clone)]
178pub struct ScanPlanConfig<'a> {
179    pub file_schema: SchemaRef,
180    pub files: &'a Vec<String>,
181    pub projection: Option<&'a Vec<usize>>,
182    pub filters: &'a [Expr],
183    pub limit: Option<usize>,
184    pub store: ObjectStore,
185}
186
187pub fn create_stream(
188    format: &Format,
189    config: &ScanPlanConfig,
190) -> Result<DfSendableRecordBatchStream> {
191    match format {
192        Format::Csv(format) => new_csv_stream(config, format),
193        Format::Json(_) => new_json_stream(config),
194        Format::Parquet(_) => new_parquet_stream_with_exec_plan(config),
195        Format::Orc(_) => new_orc_stream(config),
196    }
197}