file_engine/query/
file_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::file_format::csv::CsvFormat;
18use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
19use common_datasource::file_format::Format;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::SendableRecordBatchStream;
22use datafusion::common::ToDFSchema;
23use datafusion::datasource::listing::PartitionedFile;
24use datafusion::datasource::object_store::ObjectStoreUrl;
25use datafusion::datasource::physical_plan::{
26    CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, ParquetSource,
27};
28use datafusion::datasource::source::DataSourceExec;
29use datafusion::physical_expr::create_physical_expr;
30use datafusion::physical_expr::execution_props::ExecutionProps;
31use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::prelude::SessionContext;
34use datafusion_expr::expr::Expr;
35use datafusion_expr::utils::conjunction;
36use datafusion_orc::OrcSource;
37use datatypes::arrow::datatypes::Schema as ArrowSchema;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46fn build_record_batch_stream(
47    scan_plan_config: &ScanPlanConfig,
48    file_schema: Arc<ArrowSchema>,
49    limit: Option<usize>,
50    file_source: Arc<dyn FileSource>,
51) -> Result<SendableRecordBatchStream> {
52    let files = scan_plan_config
53        .files
54        .iter()
55        .map(|filename| PartitionedFile::new(filename.to_string(), 0))
56        .collect::<Vec<_>>();
57
58    let config = FileScanConfigBuilder::new(
59        ObjectStoreUrl::local_filesystem(),
60        file_schema,
61        file_source.clone(),
62    )
63    .with_projection(scan_plan_config.projection.cloned())
64    .with_limit(limit)
65    .with_file_group(FileGroup::new(files))
66    .build();
67
68    let store = Arc::new(object_store_opendal::OpendalStore::new(
69        scan_plan_config.store.clone(),
70    ));
71
72    let file_opener = file_source.create_file_opener(store, &config, 0);
73    let stream = FileStream::new(
74        &config,
75        0, // partition: hard-code
76        file_opener,
77        &ExecutionPlanMetricsSet::new(),
78    )
79    .context(error::BuildStreamSnafu)?;
80    let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
81        .context(error::BuildStreamAdapterSnafu)?;
82    Ok(Box::pin(adapter))
83}
84
85fn new_csv_stream(
86    config: &ScanPlanConfig,
87    format: &CsvFormat,
88) -> Result<SendableRecordBatchStream> {
89    let file_schema = config.file_schema.arrow_schema().clone();
90
91    // push down limit only if there is no filter
92    let limit = config.filters.is_empty().then_some(config.limit).flatten();
93
94    let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
95        .with_schema(file_schema.clone())
96        .with_batch_size(DEFAULT_BATCH_SIZE);
97
98    build_record_batch_stream(config, file_schema, limit, csv_source)
99}
100
101fn new_json_stream(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
102    let file_schema = config.file_schema.arrow_schema().clone();
103
104    // push down limit only if there is no filter
105    let limit = config.filters.is_empty().then_some(config.limit).flatten();
106
107    let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
108    build_record_batch_stream(config, file_schema, limit, file_source)
109}
110
111fn new_parquet_stream_with_exec_plan(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
112    let file_schema = config.file_schema.arrow_schema().clone();
113    let ScanPlanConfig {
114        files,
115        projection,
116        limit,
117        filters,
118        store,
119        ..
120    } = config;
121
122    let file_group = FileGroup::new(
123        files
124            .iter()
125            .map(|filename| PartitionedFile::new(filename.to_string(), 0))
126            .collect::<Vec<_>>(),
127    );
128
129    let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
130        DefaultParquetFileReaderFactory::new(store.clone()),
131    ));
132
133    // build predicate filter
134    let filters = filters.to_vec();
135    if let Some(expr) = conjunction(filters) {
136        let df_schema = file_schema
137            .clone()
138            .to_dfschema_ref()
139            .context(error::ParquetScanPlanSnafu)?;
140
141        let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
142            .context(error::ParquetScanPlanSnafu)?;
143        parquet_source = parquet_source.with_predicate(filters);
144    };
145
146    let file_scan_config = FileScanConfigBuilder::new(
147        ObjectStoreUrl::local_filesystem(),
148        file_schema,
149        Arc::new(parquet_source),
150    )
151    .with_file_group(file_group)
152    .with_projection(projection.cloned())
153    .with_limit(*limit)
154    .build();
155
156    // TODO(ruihang): get this from upper layer
157    let task_ctx = SessionContext::default().task_ctx();
158
159    let parquet_exec = DataSourceExec::from_data_source(file_scan_config);
160    let stream = parquet_exec
161        .execute(0, task_ctx)
162        .context(error::ParquetScanPlanSnafu)?;
163
164    Ok(Box::pin(
165        RecordBatchStreamAdapter::try_new(stream).context(error::BuildStreamAdapterSnafu)?,
166    ))
167}
168
169fn new_orc_stream(config: &ScanPlanConfig) -> Result<SendableRecordBatchStream> {
170    let file_schema = config.file_schema.arrow_schema().clone();
171
172    // push down limit only if there is no filter
173    let limit = config.filters.is_empty().then_some(config.limit).flatten();
174
175    let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
176    build_record_batch_stream(config, file_schema, limit, file_source)
177}
178
179#[derive(Debug, Clone)]
180pub struct ScanPlanConfig<'a> {
181    pub file_schema: SchemaRef,
182    pub files: &'a Vec<String>,
183    pub projection: Option<&'a Vec<usize>>,
184    pub filters: &'a [Expr],
185    pub limit: Option<usize>,
186    pub store: ObjectStore,
187}
188
189pub fn create_stream(
190    format: &Format,
191    config: &ScanPlanConfig,
192) -> Result<SendableRecordBatchStream> {
193    match format {
194        Format::Csv(format) => new_csv_stream(config, format),
195        Format::Json(_) => new_json_stream(config),
196        Format::Parquet(_) => new_parquet_stream_with_exec_plan(config),
197        Format::Orc(_) => new_orc_stream(config),
198    }
199}