common_datasource/file_format/
parquet.rs1use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::ParquetMetaData;
27use datafusion::parquet::format::FileMetaData;
28use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datatypes::schema::SchemaRef;
31use futures::future::BoxFuture;
32use futures::StreamExt;
33use object_store::{FuturesAsyncReader, ObjectStore};
34use parquet::arrow::AsyncArrowWriter;
35use parquet::basic::{Compression, Encoding, ZstdLevel};
36use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
37use parquet::schema::types::ColumnPath;
38use snafu::ResultExt;
39use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
40
41use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
42use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
43use crate::file_format::FileFormat;
44use crate::share_buffer::SharedBuffer;
45use crate::DEFAULT_WRITE_BUFFER_SIZE;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
48pub struct ParquetFormat {}
49
50#[async_trait]
51impl FileFormat for ParquetFormat {
52 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
53 let meta = store
54 .stat(path)
55 .await
56 .context(error::ReadObjectSnafu { path })?;
57
58 let mut reader = store
59 .reader(path)
60 .await
61 .context(error::ReadObjectSnafu { path })?
62 .into_futures_async_read(0..meta.content_length())
63 .await
64 .context(error::ReadObjectSnafu { path })?
65 .compat();
66
67 let metadata = reader
68 .get_metadata()
69 .await
70 .context(error::ReadParquetSnafuSnafu)?;
71
72 let file_metadata = metadata.file_metadata();
73 let schema = parquet_to_arrow_schema(
74 file_metadata.schema_descr(),
75 file_metadata.key_value_metadata(),
76 )
77 .context(error::ParquetToSchemaSnafu)?;
78
79 Ok(schema)
80 }
81}
82
83#[derive(Debug, Clone)]
84pub struct DefaultParquetFileReaderFactory {
85 object_store: ObjectStore,
86}
87
88impl DefaultParquetFileReaderFactory {
90 pub fn new(object_store: ObjectStore) -> Self {
91 Self { object_store }
92 }
93}
94
95impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
96 fn create_reader(
100 &self,
101 _partition_index: usize,
102 file_meta: FileMeta,
103 _metadata_size_hint: Option<usize>,
104 _metrics: &ExecutionPlanMetricsSet,
105 ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106 let path = file_meta.location().to_string();
107 let object_store = self.object_store.clone();
108
109 Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
110 }
111}
112
113pub struct LazyParquetFileReader {
114 object_store: ObjectStore,
115 reader: Option<Compat<FuturesAsyncReader>>,
116 path: String,
117}
118
119impl LazyParquetFileReader {
120 pub fn new(object_store: ObjectStore, path: String) -> Self {
121 LazyParquetFileReader {
122 object_store,
123 path,
124 reader: None,
125 }
126 }
127
128 async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
130 if self.reader.is_none() {
131 let meta = self.object_store.stat(&self.path).await?;
132 let reader = self
133 .object_store
134 .reader(&self.path)
135 .await?
136 .into_futures_async_read(0..meta.content_length())
137 .await?
138 .compat();
139 self.reader = Some(reader);
140 }
141
142 Ok(())
143 }
144}
145
146impl AsyncFileReader for LazyParquetFileReader {
147 fn get_bytes(
148 &mut self,
149 range: std::ops::Range<usize>,
150 ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
151 Box::pin(async move {
152 self.maybe_initialize()
153 .await
154 .map_err(|e| ParquetError::External(Box::new(e)))?;
155 self.reader.as_mut().unwrap().get_bytes(range).await
157 })
158 }
159
160 fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
161 Box::pin(async move {
162 self.maybe_initialize()
163 .await
164 .map_err(|e| ParquetError::External(Box::new(e)))?;
165 self.reader.as_mut().unwrap().get_metadata().await
167 })
168 }
169}
170
171impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
172 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
173 self.write(batch).context(error::EncodeRecordBatchSnafu)
174 }
175}
176
177#[async_trait]
178impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
179 async fn close(self) -> Result<FileMetaData> {
180 self.close().context(error::EncodeRecordBatchSnafu)
181 }
182}
183
184pub async fn stream_to_parquet(
188 mut stream: SendableRecordBatchStream,
189 schema: datatypes::schema::SchemaRef,
190 store: ObjectStore,
191 path: &str,
192 concurrency: usize,
193) -> Result<usize> {
194 let write_props = column_wise_config(
195 WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
196 schema,
197 )
198 .build();
199 let inner_writer = store
200 .writer_with(path)
201 .concurrent(concurrency)
202 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
203 .await
204 .map(|w| w.into_futures_async_write().compat_write())
205 .context(WriteObjectSnafu { path })?;
206
207 let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
208 .context(WriteParquetSnafu { path })?;
209 let mut rows_written = 0;
210
211 while let Some(batch) = stream.next().await {
212 let batch = batch.context(error::ReadRecordBatchSnafu)?;
213 writer
214 .write(&batch)
215 .await
216 .context(WriteParquetSnafu { path })?;
217 rows_written += batch.num_rows();
218 }
219 writer.close().await.context(WriteParquetSnafu { path })?;
220 Ok(rows_written)
221}
222
223fn column_wise_config(
225 mut props: WriterPropertiesBuilder,
226 schema: SchemaRef,
227) -> WriterPropertiesBuilder {
228 for col in schema.column_schemas() {
231 if col.data_type.is_timestamp() {
232 let path = ColumnPath::new(vec![col.name.clone()]);
233 props = props
234 .set_column_dictionary_enabled(path.clone(), false)
235 .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
236 }
237 }
238 props
239}
240
241#[cfg(test)]
242mod tests {
243 use common_test_util::find_workspace_path;
244
245 use super::*;
246 use crate::test_util::{format_schema, test_store};
247
248 fn test_data_root() -> String {
249 find_workspace_path("/src/common/datasource/tests/parquet")
250 .display()
251 .to_string()
252 }
253
254 #[tokio::test]
255 async fn infer_schema_basic() {
256 let json = ParquetFormat::default();
257 let store = test_store(&test_data_root());
258 let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
259 let formatted: Vec<_> = format_schema(schema);
260
261 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
262 }
263}