common_datasource/file_format/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::ParquetMetaData;
27use datafusion::parquet::format::FileMetaData;
28use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datatypes::schema::SchemaRef;
31use futures::future::BoxFuture;
32use futures::StreamExt;
33use object_store::{FuturesAsyncReader, ObjectStore};
34use parquet::arrow::AsyncArrowWriter;
35use parquet::basic::{Compression, Encoding, ZstdLevel};
36use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
37use parquet::schema::types::ColumnPath;
38use snafu::ResultExt;
39use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
40
41use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
42use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
43use crate::file_format::FileFormat;
44use crate::share_buffer::SharedBuffer;
45use crate::DEFAULT_WRITE_BUFFER_SIZE;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
48pub struct ParquetFormat {}
49
50#[async_trait]
51impl FileFormat for ParquetFormat {
52    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
53        let meta = store
54            .stat(path)
55            .await
56            .context(error::ReadObjectSnafu { path })?;
57
58        let mut reader = store
59            .reader(path)
60            .await
61            .context(error::ReadObjectSnafu { path })?
62            .into_futures_async_read(0..meta.content_length())
63            .await
64            .context(error::ReadObjectSnafu { path })?
65            .compat();
66
67        let metadata = reader
68            .get_metadata()
69            .await
70            .context(error::ReadParquetSnafuSnafu)?;
71
72        let file_metadata = metadata.file_metadata();
73        let schema = parquet_to_arrow_schema(
74            file_metadata.schema_descr(),
75            file_metadata.key_value_metadata(),
76        )
77        .context(error::ParquetToSchemaSnafu)?;
78
79        Ok(schema)
80    }
81}
82
83#[derive(Debug, Clone)]
84pub struct DefaultParquetFileReaderFactory {
85    object_store: ObjectStore,
86}
87
88/// Returns a AsyncFileReader factory
89impl DefaultParquetFileReaderFactory {
90    pub fn new(object_store: ObjectStore) -> Self {
91        Self { object_store }
92    }
93}
94
95impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
96    // TODO(weny): Supports [`metadata_size_hint`].
97    // The upstream has a implementation supports [`metadata_size_hint`],
98    // however it coupled with Box<dyn ObjectStore>.
99    fn create_reader(
100        &self,
101        _partition_index: usize,
102        file_meta: FileMeta,
103        _metadata_size_hint: Option<usize>,
104        _metrics: &ExecutionPlanMetricsSet,
105    ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106        let path = file_meta.location().to_string();
107        let object_store = self.object_store.clone();
108
109        Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
110    }
111}
112
113pub struct LazyParquetFileReader {
114    object_store: ObjectStore,
115    reader: Option<Compat<FuturesAsyncReader>>,
116    path: String,
117}
118
119impl LazyParquetFileReader {
120    pub fn new(object_store: ObjectStore, path: String) -> Self {
121        LazyParquetFileReader {
122            object_store,
123            path,
124            reader: None,
125        }
126    }
127
128    /// Must initialize the reader, or throw an error from the future.
129    async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
130        if self.reader.is_none() {
131            let meta = self.object_store.stat(&self.path).await?;
132            let reader = self
133                .object_store
134                .reader(&self.path)
135                .await?
136                .into_futures_async_read(0..meta.content_length())
137                .await?
138                .compat();
139            self.reader = Some(reader);
140        }
141
142        Ok(())
143    }
144}
145
146impl AsyncFileReader for LazyParquetFileReader {
147    fn get_bytes(
148        &mut self,
149        range: std::ops::Range<usize>,
150    ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
151        Box::pin(async move {
152            self.maybe_initialize()
153                .await
154                .map_err(|e| ParquetError::External(Box::new(e)))?;
155            // Safety: Must initialized
156            self.reader.as_mut().unwrap().get_bytes(range).await
157        })
158    }
159
160    fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
161        Box::pin(async move {
162            self.maybe_initialize()
163                .await
164                .map_err(|e| ParquetError::External(Box::new(e)))?;
165            // Safety: Must initialized
166            self.reader.as_mut().unwrap().get_metadata().await
167        })
168    }
169}
170
171impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
172    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
173        self.write(batch).context(error::EncodeRecordBatchSnafu)
174    }
175}
176
177#[async_trait]
178impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
179    async fn close(self) -> Result<FileMetaData> {
180        self.close().context(error::EncodeRecordBatchSnafu)
181    }
182}
183
184/// Output the stream to a parquet file.
185///
186/// Returns number of rows written.
187pub async fn stream_to_parquet(
188    mut stream: SendableRecordBatchStream,
189    schema: datatypes::schema::SchemaRef,
190    store: ObjectStore,
191    path: &str,
192    concurrency: usize,
193) -> Result<usize> {
194    let write_props = column_wise_config(
195        WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
196        schema,
197    )
198    .build();
199    let inner_writer = store
200        .writer_with(path)
201        .concurrent(concurrency)
202        .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
203        .await
204        .map(|w| w.into_futures_async_write().compat_write())
205        .context(WriteObjectSnafu { path })?;
206
207    let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
208        .context(WriteParquetSnafu { path })?;
209    let mut rows_written = 0;
210
211    while let Some(batch) = stream.next().await {
212        let batch = batch.context(error::ReadRecordBatchSnafu)?;
213        writer
214            .write(&batch)
215            .await
216            .context(WriteParquetSnafu { path })?;
217        rows_written += batch.num_rows();
218    }
219    writer.close().await.context(WriteParquetSnafu { path })?;
220    Ok(rows_written)
221}
222
223/// Customizes per-column properties.
224fn column_wise_config(
225    mut props: WriterPropertiesBuilder,
226    schema: SchemaRef,
227) -> WriterPropertiesBuilder {
228    // Disable dictionary for timestamp column, since for increasing timestamp column,
229    // the dictionary pages will be larger than data pages.
230    for col in schema.column_schemas() {
231        if col.data_type.is_timestamp() {
232            let path = ColumnPath::new(vec![col.name.clone()]);
233            props = props
234                .set_column_dictionary_enabled(path.clone(), false)
235                .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
236        }
237    }
238    props
239}
240
241#[cfg(test)]
242mod tests {
243    use common_test_util::find_workspace_path;
244
245    use super::*;
246    use crate::test_util::{format_schema, test_store};
247
248    fn test_data_root() -> String {
249        find_workspace_path("/src/common/datasource/tests/parquet")
250            .display()
251            .to_string()
252    }
253
254    #[tokio::test]
255    async fn infer_schema_basic() {
256        let json = ParquetFormat::default();
257        let store = test_store(&test_data_root());
258        let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
259        let formatted: Vec<_> = format_schema(schema);
260
261        assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
262    }
263}