Skip to main content

common_datasource/file_format/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::{
27    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
28};
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion_datasource::PartitionedFile;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use futures::future::BoxFuture;
35use object_store::{FuturesAsyncReader, ObjectStore};
36use parquet::arrow::AsyncArrowWriter;
37use parquet::arrow::arrow_reader::ArrowReaderOptions;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
40use parquet::schema::types::ColumnPath;
41use snafu::ResultExt;
42use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
43
44use crate::DEFAULT_WRITE_BUFFER_SIZE;
45use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
46use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
47use crate::file_format::FileFormat;
48use crate::share_buffer::SharedBuffer;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ParquetFormat {}
52
53#[async_trait]
54impl FileFormat for ParquetFormat {
55    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
56        let meta = store
57            .stat(path)
58            .await
59            .context(error::ReadObjectSnafu { path })?;
60
61        let mut reader = store
62            .reader(path)
63            .await
64            .context(error::ReadObjectSnafu { path })?
65            .into_futures_async_read(0..meta.content_length())
66            .await
67            .context(error::ReadObjectSnafu { path })?
68            .compat();
69
70        let metadata = reader
71            .get_metadata(None)
72            .await
73            .context(error::ReadParquetSnafuSnafu)?;
74
75        let file_metadata = metadata.file_metadata();
76        let schema = parquet_to_arrow_schema(
77            file_metadata.schema_descr(),
78            file_metadata.key_value_metadata(),
79        )
80        .context(error::ParquetToSchemaSnafu)?;
81
82        Ok(schema)
83    }
84}
85
86#[derive(Debug, Clone)]
87pub struct DefaultParquetFileReaderFactory {
88    object_store: ObjectStore,
89}
90
91/// Returns a AsyncFileReader factory
92impl DefaultParquetFileReaderFactory {
93    pub fn new(object_store: ObjectStore) -> Self {
94        Self { object_store }
95    }
96}
97
98impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
99    fn create_reader(
100        &self,
101        _partition_index: usize,
102        partitioned_file: PartitionedFile,
103        metadata_size_hint: Option<usize>,
104        _metrics: &ExecutionPlanMetricsSet,
105    ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106        let path = partitioned_file.path().to_string();
107        let object_store = self.object_store.clone();
108
109        Ok(Box::new(LazyParquetFileReader::new(
110            object_store,
111            path,
112            metadata_size_hint,
113        )))
114    }
115}
116
117pub struct LazyParquetFileReader {
118    object_store: ObjectStore,
119    reader: Option<Compat<FuturesAsyncReader>>,
120    file_size: Option<u64>,
121    metadata_size_hint: Option<usize>,
122    path: String,
123}
124
125impl LazyParquetFileReader {
126    pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
127        LazyParquetFileReader {
128            object_store,
129            path,
130            reader: None,
131            file_size: None,
132            metadata_size_hint,
133        }
134    }
135
136    /// Must initialize the reader, or throw an error from the future.
137    async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
138        if self.reader.is_none() {
139            let meta = self.object_store.stat(&self.path).await?;
140            self.file_size = Some(meta.content_length());
141            let reader = self
142                .object_store
143                .reader(&self.path)
144                .await?
145                .into_futures_async_read(0..meta.content_length())
146                .await?
147                .compat();
148            self.reader = Some(reader);
149        }
150
151        Ok(())
152    }
153}
154
155impl AsyncFileReader for LazyParquetFileReader {
156    fn get_bytes(
157        &mut self,
158        range: std::ops::Range<u64>,
159    ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
160        Box::pin(async move {
161            self.maybe_initialize()
162                .await
163                .map_err(|e| ParquetError::External(Box::new(e)))?;
164            // Safety: Must initialized
165            self.reader.as_mut().unwrap().get_bytes(range).await
166        })
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
173        Box::pin(async move {
174            self.maybe_initialize()
175                .await
176                .map_err(|e| ParquetError::External(Box::new(e)))?;
177
178            let metadata_opts = options.map(|o| o.metadata_options().clone());
179            let metadata_reader = ParquetMetaDataReader::new()
180                .with_metadata_options(metadata_opts)
181                .with_page_index_policy(PageIndexPolicy::from(
182                    options.is_some_and(|o| o.page_index()),
183                ))
184                .with_prefetch_hint(self.metadata_size_hint);
185
186            let metadata = metadata_reader
187                .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
188                .await?;
189            Ok(Arc::new(metadata))
190        })
191    }
192}
193
194impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
195    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
196        self.write(batch).context(error::EncodeRecordBatchSnafu)
197    }
198}
199
200#[async_trait]
201impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
202    async fn close(self) -> Result<ParquetMetaData> {
203        self.close().context(error::EncodeRecordBatchSnafu)
204    }
205}
206
207/// Output the stream to a parquet file.
208///
209/// Returns number of rows written.
210pub async fn stream_to_parquet(
211    mut stream: SendableRecordBatchStream,
212    schema: datatypes::schema::SchemaRef,
213    store: ObjectStore,
214    path: &str,
215    concurrency: usize,
216) -> Result<usize> {
217    let write_props = column_wise_config(
218        WriterProperties::builder()
219            .set_compression(Compression::ZSTD(ZstdLevel::default()))
220            .set_statistics_truncate_length(None)
221            .set_column_index_truncate_length(None),
222        schema,
223    )
224    .build();
225    let inner_writer = store
226        .writer_with(path)
227        .concurrent(concurrency)
228        .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
229        .await
230        .map(|w| w.into_futures_async_write().compat_write())
231        .context(WriteObjectSnafu { path })?;
232
233    let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
234        .context(WriteParquetSnafu { path })?;
235    let mut rows_written = 0;
236
237    while let Some(batch) = stream.next().await {
238        let batch = batch.context(error::ReadRecordBatchSnafu)?;
239        writer
240            .write(&batch)
241            .await
242            .context(WriteParquetSnafu { path })?;
243        rows_written += batch.num_rows();
244    }
245    writer.close().await.context(WriteParquetSnafu { path })?;
246    Ok(rows_written)
247}
248
249/// Customizes per-column properties.
250fn column_wise_config(
251    mut props: WriterPropertiesBuilder,
252    schema: SchemaRef,
253) -> WriterPropertiesBuilder {
254    // Disable dictionary for timestamp column, since for increasing timestamp column,
255    // the dictionary pages will be larger than data pages.
256    for col in schema.column_schemas() {
257        if col.data_type.is_timestamp() {
258            let path = ColumnPath::new(vec![col.name.clone()]);
259            props = props
260                .set_column_dictionary_enabled(path.clone(), false)
261                .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
262        }
263    }
264    props
265}
266
267#[cfg(test)]
268mod tests {
269    use common_test_util::find_workspace_path;
270
271    use super::*;
272    use crate::test_util::{format_schema, test_store};
273
274    fn test_data_root() -> String {
275        find_workspace_path("/src/common/datasource/tests/parquet")
276            .display()
277            .to_string()
278    }
279
280    #[tokio::test]
281    async fn infer_schema_basic() {
282        let json = ParquetFormat::default();
283        let store = test_store(&test_data_root());
284        let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
285        let formatted: Vec<_> = format_schema(schema);
286
287        assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
288    }
289}