file_engine/query/
file_stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::file_format::csv::CsvFormat;
18use common_datasource::file_format::json::JsonFormat;
19use common_datasource::file_format::orc::{OrcFormat, OrcOpener};
20use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
21use common_datasource::file_format::Format;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::SendableRecordBatchStream;
24use datafusion::common::{Constraints, Statistics, ToDFSchema};
25use datafusion::datasource::listing::PartitionedFile;
26use datafusion::datasource::object_store::ObjectStoreUrl;
27use datafusion::datasource::physical_plan::{
28    CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener, ParquetExec,
29};
30use datafusion::physical_expr::create_physical_expr;
31use datafusion::physical_expr::execution_props::ExecutionProps;
32use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
33use datafusion::physical_plan::ExecutionPlan;
34use datafusion::prelude::SessionContext;
35use datafusion_expr::expr::Expr;
36use datafusion_expr::utils::conjunction;
37use datatypes::arrow::datatypes::Schema as ArrowSchema;
38use datatypes::schema::SchemaRef;
39use object_store::ObjectStore;
40use snafu::ResultExt;
41
42use crate::error::{self, Result};
43
44const DEFAULT_BATCH_SIZE: usize = 8192;
45
46#[derive(Debug, Clone, Copy, Default)]
47pub struct CreateScanPlanContext {}
48
49fn build_csv_opener(
50    file_schema: Arc<ArrowSchema>,
51    config: &ScanPlanConfig,
52    format: &CsvFormat,
53) -> CsvOpener {
54    let csv_config = Arc::new(CsvConfig::new(
55        DEFAULT_BATCH_SIZE,
56        file_schema,
57        config.projection.cloned(),
58        format.has_header,
59        format.delimiter,
60        b'"',
61        None,
62        Arc::new(object_store_opendal::OpendalStore::new(
63            config.store.clone(),
64        )),
65        None,
66    ));
67    CsvOpener::new(csv_config, format.compression_type.into())
68}
69
70fn build_json_opener(
71    file_schema: Arc<ArrowSchema>,
72    config: &ScanPlanConfig,
73    format: &JsonFormat,
74) -> Result<JsonOpener> {
75    let projected_schema = if let Some(projection) = config.projection {
76        Arc::new(
77            file_schema
78                .project(projection)
79                .context(error::ProjectArrowSchemaSnafu)?,
80        )
81    } else {
82        file_schema
83    };
84    let store = object_store_opendal::OpendalStore::new(config.store.clone());
85    Ok(JsonOpener::new(
86        DEFAULT_BATCH_SIZE,
87        projected_schema,
88        format.compression_type.into(),
89        Arc::new(store),
90    ))
91}
92
93fn build_orc_opener(output_schema: Arc<ArrowSchema>, config: &ScanPlanConfig) -> Result<OrcOpener> {
94    Ok(OrcOpener::new(
95        config.store.clone(),
96        output_schema,
97        config.projection.cloned(),
98    ))
99}
100
101fn build_record_batch_stream<T: FileOpener + Send + 'static>(
102    opener: T,
103    file_schema: Arc<ArrowSchema>,
104    files: &[String],
105    projection: Option<&Vec<usize>>,
106    limit: Option<usize>,
107) -> Result<SendableRecordBatchStream> {
108    let statistics = Statistics::new_unknown(file_schema.as_ref());
109    let stream = FileStream::new(
110        &FileScanConfig {
111            object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
112            file_schema,
113            file_groups: vec![files
114                .iter()
115                .map(|filename| PartitionedFile::new(filename.to_string(), 0))
116                .collect::<Vec<_>>()],
117            statistics,
118            projection: projection.cloned(),
119            limit,
120            table_partition_cols: vec![],
121            output_ordering: vec![],
122            constraints: Constraints::empty(),
123        },
124        0, // partition: hard-code
125        opener,
126        &ExecutionPlanMetricsSet::new(),
127    )
128    .context(error::BuildStreamSnafu)?;
129    let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
130        .context(error::BuildStreamAdapterSnafu)?;
131    Ok(Box::pin(adapter))
132}
133
134fn new_csv_stream(
135    _ctx: &CreateScanPlanContext,
136    config: &ScanPlanConfig,
137    format: &CsvFormat,
138) -> Result<SendableRecordBatchStream> {
139    let file_schema = config.file_schema.arrow_schema().clone();
140    let opener = build_csv_opener(file_schema.clone(), config, format);
141    // push down limit only if there is no filter
142    let limit = config.filters.is_empty().then_some(config.limit).flatten();
143    build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
144}
145
146fn new_json_stream(
147    _ctx: &CreateScanPlanContext,
148    config: &ScanPlanConfig,
149    format: &JsonFormat,
150) -> Result<SendableRecordBatchStream> {
151    let file_schema = config.file_schema.arrow_schema().clone();
152    let opener = build_json_opener(file_schema.clone(), config, format)?;
153    // push down limit only if there is no filter
154    let limit = config.filters.is_empty().then_some(config.limit).flatten();
155    build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
156}
157
158fn new_parquet_stream_with_exec_plan(
159    _ctx: &CreateScanPlanContext,
160    config: &ScanPlanConfig,
161    _format: &ParquetFormat,
162) -> Result<SendableRecordBatchStream> {
163    let file_schema = config.file_schema.arrow_schema().clone();
164    let ScanPlanConfig {
165        files,
166        projection,
167        limit,
168        filters,
169        store,
170        ..
171    } = config;
172
173    // construct config for ParquetExec
174    let scan_config = FileScanConfig {
175        object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
176        file_schema: file_schema.clone(),
177        file_groups: vec![files
178            .iter()
179            .map(|filename| PartitionedFile::new(filename.to_string(), 0))
180            .collect::<Vec<_>>()],
181        constraints: Constraints::empty(),
182        statistics: Statistics::new_unknown(file_schema.as_ref()),
183        projection: projection.cloned(),
184        limit: *limit,
185        table_partition_cols: vec![],
186        output_ordering: vec![],
187    };
188
189    // build predicate filter
190    let filters = filters.to_vec();
191    let filters = if let Some(expr) = conjunction(filters) {
192        let df_schema = file_schema
193            .clone()
194            .to_dfschema_ref()
195            .context(error::ParquetScanPlanSnafu)?;
196
197        let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
198            .context(error::ParquetScanPlanSnafu)?;
199        Some(filters)
200    } else {
201        None
202    };
203
204    // TODO(ruihang): get this from upper layer
205    let task_ctx = SessionContext::default().task_ctx();
206    let mut builder = ParquetExec::builder(scan_config);
207    if let Some(filters) = filters {
208        builder = builder.with_predicate(filters);
209    }
210    let parquet_exec = builder
211        .with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
212            store.clone(),
213        )))
214        .build();
215    let stream = parquet_exec
216        .execute(0, task_ctx)
217        .context(error::ParquetScanPlanSnafu)?;
218
219    Ok(Box::pin(
220        RecordBatchStreamAdapter::try_new(stream).context(error::BuildStreamAdapterSnafu)?,
221    ))
222}
223
224fn new_orc_stream(
225    _ctx: &CreateScanPlanContext,
226    config: &ScanPlanConfig,
227    _format: &OrcFormat,
228) -> Result<SendableRecordBatchStream> {
229    let file_schema = config.file_schema.arrow_schema().clone();
230    let opener = build_orc_opener(file_schema.clone(), config)?;
231    // push down limit only if there is no filter
232    let limit = config.filters.is_empty().then_some(config.limit).flatten();
233    build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
234}
235
236#[derive(Debug, Clone)]
237pub struct ScanPlanConfig<'a> {
238    pub file_schema: SchemaRef,
239    pub files: &'a Vec<String>,
240    pub projection: Option<&'a Vec<usize>>,
241    pub filters: &'a [Expr],
242    pub limit: Option<usize>,
243    pub store: ObjectStore,
244}
245
246pub fn create_stream(
247    format: &Format,
248    ctx: &CreateScanPlanContext,
249    config: &ScanPlanConfig,
250) -> Result<SendableRecordBatchStream> {
251    match format {
252        Format::Csv(format) => new_csv_stream(ctx, config, format),
253        Format::Json(format) => new_json_stream(ctx, config, format),
254        Format::Parquet(format) => new_parquet_stream_with_exec_plan(ctx, config, format),
255        Format::Orc(format) => new_orc_stream(ctx, config, format),
256    }
257}