Skip to main content

common_datasource/file_format/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::{
27    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
28};
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion_datasource::PartitionedFile;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use futures::future::BoxFuture;
35use object_store::{FuturesAsyncReader, ObjectStore};
36use parquet::arrow::AsyncArrowWriter;
37use parquet::arrow::arrow_reader::ArrowReaderOptions;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
40use parquet::schema::types::ColumnPath;
41use snafu::ResultExt;
42use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
43
44use crate::DEFAULT_WRITE_BUFFER_SIZE;
45use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
46use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
47use crate::file_format::FileFormat;
48use crate::share_buffer::SharedBuffer;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ParquetFormat {}
52
53#[async_trait]
54impl FileFormat for ParquetFormat {
55    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
56        let meta = store
57            .stat(path)
58            .await
59            .context(error::ReadObjectSnafu { path })?;
60
61        let mut reader = store
62            .reader(path)
63            .await
64            .context(error::ReadObjectSnafu { path })?
65            .into_futures_async_read(0..meta.content_length())
66            .await
67            .context(error::ReadObjectSnafu { path })?
68            .compat();
69
70        let metadata = reader
71            .get_metadata(None)
72            .await
73            .context(error::ReadParquetSnafuSnafu)?;
74
75        let file_metadata = metadata.file_metadata();
76        let schema = parquet_to_arrow_schema(
77            file_metadata.schema_descr(),
78            file_metadata.key_value_metadata(),
79        )
80        .context(error::ParquetToSchemaSnafu)?;
81
82        Ok(schema)
83    }
84}
85
86#[derive(Debug, Clone)]
87pub struct DefaultParquetFileReaderFactory {
88    object_store: ObjectStore,
89}
90
91/// Returns a AsyncFileReader factory
92impl DefaultParquetFileReaderFactory {
93    pub fn new(object_store: ObjectStore) -> Self {
94        Self { object_store }
95    }
96}
97
98impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
99    fn create_reader(
100        &self,
101        _partition_index: usize,
102        partitioned_file: PartitionedFile,
103        metadata_size_hint: Option<usize>,
104        _metrics: &ExecutionPlanMetricsSet,
105    ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106        let path = partitioned_file.path().to_string();
107        let object_store = self.object_store.clone();
108
109        Ok(Box::new(LazyParquetFileReader::new(
110            object_store,
111            path,
112            metadata_size_hint,
113        )))
114    }
115}
116
117pub struct LazyParquetFileReader {
118    object_store: ObjectStore,
119    reader: Option<Compat<FuturesAsyncReader>>,
120    file_size: Option<u64>,
121    metadata_size_hint: Option<usize>,
122    path: String,
123}
124
125impl LazyParquetFileReader {
126    pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
127        LazyParquetFileReader {
128            object_store,
129            path,
130            reader: None,
131            file_size: None,
132            metadata_size_hint,
133        }
134    }
135
136    /// Must initialize the reader, or throw an error from the future.
137    async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
138        if self.reader.is_none() {
139            let meta = self.object_store.stat(&self.path).await?;
140            self.file_size = Some(meta.content_length());
141            let reader = self
142                .object_store
143                .reader(&self.path)
144                .await?
145                .into_futures_async_read(0..meta.content_length())
146                .await?
147                .compat();
148            self.reader = Some(reader);
149        }
150
151        Ok(())
152    }
153}
154
155impl AsyncFileReader for LazyParquetFileReader {
156    fn get_bytes(
157        &mut self,
158        range: std::ops::Range<u64>,
159    ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
160        Box::pin(async move {
161            self.maybe_initialize()
162                .await
163                .map_err(|e| ParquetError::External(Box::new(e)))?;
164            // Safety: Must initialized
165            self.reader.as_mut().unwrap().get_bytes(range).await
166        })
167    }
168
169    fn get_metadata<'a>(
170        &'a mut self,
171        options: Option<&'a ArrowReaderOptions>,
172    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
173        Box::pin(async move {
174            self.maybe_initialize()
175                .await
176                .map_err(|e| ParquetError::External(Box::new(e)))?;
177
178            let metadata_opts = options.map(|o| o.metadata_options().clone());
179            let column_index_policy =
180                options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy());
181            let offset_index_policy =
182                options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy());
183            let metadata_reader = ParquetMetaDataReader::new()
184                .with_metadata_options(metadata_opts)
185                .with_column_index_policy(column_index_policy)
186                .with_offset_index_policy(offset_index_policy)
187                .with_prefetch_hint(self.metadata_size_hint);
188
189            let metadata = metadata_reader
190                .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
191                .await?;
192            Ok(Arc::new(metadata))
193        })
194    }
195}
196
197impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
198    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
199        self.write(batch).context(error::EncodeRecordBatchSnafu)
200    }
201}
202
203#[async_trait]
204impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
205    async fn close(self) -> Result<ParquetMetaData> {
206        self.close().context(error::EncodeRecordBatchSnafu)
207    }
208}
209
210/// Output the stream to a parquet file.
211///
212/// Returns number of rows written.
213pub async fn stream_to_parquet(
214    mut stream: SendableRecordBatchStream,
215    schema: datatypes::schema::SchemaRef,
216    store: ObjectStore,
217    path: &str,
218    concurrency: usize,
219) -> Result<usize> {
220    let write_props = column_wise_config(
221        WriterProperties::builder()
222            .set_compression(Compression::ZSTD(ZstdLevel::default()))
223            .set_statistics_truncate_length(None)
224            .set_column_index_truncate_length(None),
225        schema,
226    )
227    .build();
228    let inner_writer = store
229        .writer_with(path)
230        .concurrent(concurrency)
231        .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
232        .await
233        .map(|w| w.into_futures_async_write().compat_write())
234        .context(WriteObjectSnafu { path })?;
235
236    let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
237        .context(WriteParquetSnafu { path })?;
238    let mut rows_written = 0;
239
240    while let Some(batch) = stream.next().await {
241        let batch = batch.context(error::ReadRecordBatchSnafu)?;
242        writer
243            .write(&batch)
244            .await
245            .context(WriteParquetSnafu { path })?;
246        rows_written += batch.num_rows();
247    }
248    writer.close().await.context(WriteParquetSnafu { path })?;
249    Ok(rows_written)
250}
251
252/// Customizes per-column properties.
253fn column_wise_config(
254    mut props: WriterPropertiesBuilder,
255    schema: SchemaRef,
256) -> WriterPropertiesBuilder {
257    // Disable dictionary for timestamp column, since for increasing timestamp column,
258    // the dictionary pages will be larger than data pages.
259    for col in schema.column_schemas() {
260        if col.data_type.is_timestamp() {
261            let path = ColumnPath::new(vec![col.name.clone()]);
262            props = props
263                .set_column_dictionary_enabled(path.clone(), false)
264                .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
265        }
266    }
267    props
268}
269
270#[cfg(test)]
271mod tests {
272    use common_test_util::find_workspace_path;
273
274    use super::*;
275    use crate::test_util::{format_schema, test_store};
276
277    fn test_data_root() -> String {
278        find_workspace_path("/src/common/datasource/tests/parquet")
279            .display()
280            .to_string()
281    }
282
283    #[tokio::test]
284    async fn infer_schema_basic() {
285        let json = ParquetFormat::default();
286        let store = test_store(&test_data_root());
287        let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
288        let formatted: Vec<_> = format_schema(schema);
289
290        assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
291    }
292}