common_datasource/file_format/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::ParquetMetaData;
27use datafusion::parquet::format::FileMetaData;
28use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datatypes::schema::SchemaRef;
31use futures::future::BoxFuture;
32use futures::StreamExt;
33use object_store::{FuturesAsyncReader, ObjectStore};
34use parquet::arrow::arrow_reader::ArrowReaderOptions;
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
38use parquet::schema::types::ColumnPath;
39use snafu::ResultExt;
40use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
41
42use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
43use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
44use crate::file_format::FileFormat;
45use crate::share_buffer::SharedBuffer;
46use crate::DEFAULT_WRITE_BUFFER_SIZE;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
49pub struct ParquetFormat {}
50
51#[async_trait]
52impl FileFormat for ParquetFormat {
53    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
54        let meta = store
55            .stat(path)
56            .await
57            .context(error::ReadObjectSnafu { path })?;
58
59        let mut reader = store
60            .reader(path)
61            .await
62            .context(error::ReadObjectSnafu { path })?
63            .into_futures_async_read(0..meta.content_length())
64            .await
65            .context(error::ReadObjectSnafu { path })?
66            .compat();
67
68        let metadata = reader
69            .get_metadata(None)
70            .await
71            .context(error::ReadParquetSnafuSnafu)?;
72
73        let file_metadata = metadata.file_metadata();
74        let schema = parquet_to_arrow_schema(
75            file_metadata.schema_descr(),
76            file_metadata.key_value_metadata(),
77        )
78        .context(error::ParquetToSchemaSnafu)?;
79
80        Ok(schema)
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct DefaultParquetFileReaderFactory {
86    object_store: ObjectStore,
87}
88
89/// Returns a AsyncFileReader factory
90impl DefaultParquetFileReaderFactory {
91    pub fn new(object_store: ObjectStore) -> Self {
92        Self { object_store }
93    }
94}
95
96impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
97    // TODO(weny): Supports [`metadata_size_hint`].
98    // The upstream has a implementation supports [`metadata_size_hint`],
99    // however it coupled with Box<dyn ObjectStore>.
100    fn create_reader(
101        &self,
102        _partition_index: usize,
103        file_meta: FileMeta,
104        _metadata_size_hint: Option<usize>,
105        _metrics: &ExecutionPlanMetricsSet,
106    ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
107        let path = file_meta.location().to_string();
108        let object_store = self.object_store.clone();
109
110        Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
111    }
112}
113
114pub struct LazyParquetFileReader {
115    object_store: ObjectStore,
116    reader: Option<Compat<FuturesAsyncReader>>,
117    path: String,
118}
119
120impl LazyParquetFileReader {
121    pub fn new(object_store: ObjectStore, path: String) -> Self {
122        LazyParquetFileReader {
123            object_store,
124            path,
125            reader: None,
126        }
127    }
128
129    /// Must initialize the reader, or throw an error from the future.
130    async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
131        if self.reader.is_none() {
132            let meta = self.object_store.stat(&self.path).await?;
133            let reader = self
134                .object_store
135                .reader(&self.path)
136                .await?
137                .into_futures_async_read(0..meta.content_length())
138                .await?
139                .compat();
140            self.reader = Some(reader);
141        }
142
143        Ok(())
144    }
145}
146
147impl AsyncFileReader for LazyParquetFileReader {
148    fn get_bytes(
149        &mut self,
150        range: std::ops::Range<u64>,
151    ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
152        Box::pin(async move {
153            self.maybe_initialize()
154                .await
155                .map_err(|e| ParquetError::External(Box::new(e)))?;
156            // Safety: Must initialized
157            self.reader.as_mut().unwrap().get_bytes(range).await
158        })
159    }
160
161    fn get_metadata<'a>(
162        &'a mut self,
163        options: Option<&'a ArrowReaderOptions>,
164    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
165        Box::pin(async move {
166            self.maybe_initialize()
167                .await
168                .map_err(|e| ParquetError::External(Box::new(e)))?;
169            // Safety: Must initialized
170            self.reader.as_mut().unwrap().get_metadata(options).await
171        })
172    }
173}
174
175impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
176    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
177        self.write(batch).context(error::EncodeRecordBatchSnafu)
178    }
179}
180
181#[async_trait]
182impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
183    async fn close(self) -> Result<FileMetaData> {
184        self.close().context(error::EncodeRecordBatchSnafu)
185    }
186}
187
188/// Output the stream to a parquet file.
189///
190/// Returns number of rows written.
191pub async fn stream_to_parquet(
192    mut stream: SendableRecordBatchStream,
193    schema: datatypes::schema::SchemaRef,
194    store: ObjectStore,
195    path: &str,
196    concurrency: usize,
197) -> Result<usize> {
198    let write_props = column_wise_config(
199        WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
200        schema,
201    )
202    .build();
203    let inner_writer = store
204        .writer_with(path)
205        .concurrent(concurrency)
206        .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
207        .await
208        .map(|w| w.into_futures_async_write().compat_write())
209        .context(WriteObjectSnafu { path })?;
210
211    let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
212        .context(WriteParquetSnafu { path })?;
213    let mut rows_written = 0;
214
215    while let Some(batch) = stream.next().await {
216        let batch = batch.context(error::ReadRecordBatchSnafu)?;
217        writer
218            .write(&batch)
219            .await
220            .context(WriteParquetSnafu { path })?;
221        rows_written += batch.num_rows();
222    }
223    writer.close().await.context(WriteParquetSnafu { path })?;
224    Ok(rows_written)
225}
226
227/// Customizes per-column properties.
228fn column_wise_config(
229    mut props: WriterPropertiesBuilder,
230    schema: SchemaRef,
231) -> WriterPropertiesBuilder {
232    // Disable dictionary for timestamp column, since for increasing timestamp column,
233    // the dictionary pages will be larger than data pages.
234    for col in schema.column_schemas() {
235        if col.data_type.is_timestamp() {
236            let path = ColumnPath::new(vec![col.name.clone()]);
237            props = props
238                .set_column_dictionary_enabled(path.clone(), false)
239                .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
240        }
241    }
242    props
243}
244
245#[cfg(test)]
246mod tests {
247    use common_test_util::find_workspace_path;
248
249    use super::*;
250    use crate::test_util::{format_schema, test_store};
251
252    fn test_data_root() -> String {
253        find_workspace_path("/src/common/datasource/tests/parquet")
254            .display()
255            .to_string()
256    }
257
258    #[tokio::test]
259    async fn infer_schema_basic() {
260        let json = ParquetFormat::default();
261        let store = test_store(&test_data_root());
262        let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
263        let formatted: Vec<_> = format_schema(schema);
264
265        assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
266    }
267}