common_datasource/file_format/
parquet.rs1use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::ParquetMetaData;
27use datafusion::parquet::format::FileMetaData;
28use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datatypes::schema::SchemaRef;
31use futures::future::BoxFuture;
32use futures::StreamExt;
33use object_store::{FuturesAsyncReader, ObjectStore};
34use parquet::arrow::arrow_reader::ArrowReaderOptions;
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
38use parquet::schema::types::ColumnPath;
39use snafu::ResultExt;
40use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
41
42use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
43use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
44use crate::file_format::FileFormat;
45use crate::share_buffer::SharedBuffer;
46use crate::DEFAULT_WRITE_BUFFER_SIZE;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
49pub struct ParquetFormat {}
50
51#[async_trait]
52impl FileFormat for ParquetFormat {
53 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
54 let meta = store
55 .stat(path)
56 .await
57 .context(error::ReadObjectSnafu { path })?;
58
59 let mut reader = store
60 .reader(path)
61 .await
62 .context(error::ReadObjectSnafu { path })?
63 .into_futures_async_read(0..meta.content_length())
64 .await
65 .context(error::ReadObjectSnafu { path })?
66 .compat();
67
68 let metadata = reader
69 .get_metadata(None)
70 .await
71 .context(error::ReadParquetSnafuSnafu)?;
72
73 let file_metadata = metadata.file_metadata();
74 let schema = parquet_to_arrow_schema(
75 file_metadata.schema_descr(),
76 file_metadata.key_value_metadata(),
77 )
78 .context(error::ParquetToSchemaSnafu)?;
79
80 Ok(schema)
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct DefaultParquetFileReaderFactory {
86 object_store: ObjectStore,
87}
88
89impl DefaultParquetFileReaderFactory {
91 pub fn new(object_store: ObjectStore) -> Self {
92 Self { object_store }
93 }
94}
95
96impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
97 fn create_reader(
101 &self,
102 _partition_index: usize,
103 file_meta: FileMeta,
104 _metadata_size_hint: Option<usize>,
105 _metrics: &ExecutionPlanMetricsSet,
106 ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
107 let path = file_meta.location().to_string();
108 let object_store = self.object_store.clone();
109
110 Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
111 }
112}
113
114pub struct LazyParquetFileReader {
115 object_store: ObjectStore,
116 reader: Option<Compat<FuturesAsyncReader>>,
117 path: String,
118}
119
120impl LazyParquetFileReader {
121 pub fn new(object_store: ObjectStore, path: String) -> Self {
122 LazyParquetFileReader {
123 object_store,
124 path,
125 reader: None,
126 }
127 }
128
129 async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
131 if self.reader.is_none() {
132 let meta = self.object_store.stat(&self.path).await?;
133 let reader = self
134 .object_store
135 .reader(&self.path)
136 .await?
137 .into_futures_async_read(0..meta.content_length())
138 .await?
139 .compat();
140 self.reader = Some(reader);
141 }
142
143 Ok(())
144 }
145}
146
147impl AsyncFileReader for LazyParquetFileReader {
148 fn get_bytes(
149 &mut self,
150 range: std::ops::Range<u64>,
151 ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
152 Box::pin(async move {
153 self.maybe_initialize()
154 .await
155 .map_err(|e| ParquetError::External(Box::new(e)))?;
156 self.reader.as_mut().unwrap().get_bytes(range).await
158 })
159 }
160
161 fn get_metadata<'a>(
162 &'a mut self,
163 options: Option<&'a ArrowReaderOptions>,
164 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
165 Box::pin(async move {
166 self.maybe_initialize()
167 .await
168 .map_err(|e| ParquetError::External(Box::new(e)))?;
169 self.reader.as_mut().unwrap().get_metadata(options).await
171 })
172 }
173}
174
175impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
176 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
177 self.write(batch).context(error::EncodeRecordBatchSnafu)
178 }
179}
180
181#[async_trait]
182impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
183 async fn close(self) -> Result<FileMetaData> {
184 self.close().context(error::EncodeRecordBatchSnafu)
185 }
186}
187
188pub async fn stream_to_parquet(
192 mut stream: SendableRecordBatchStream,
193 schema: datatypes::schema::SchemaRef,
194 store: ObjectStore,
195 path: &str,
196 concurrency: usize,
197) -> Result<usize> {
198 let write_props = column_wise_config(
199 WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
200 schema,
201 )
202 .build();
203 let inner_writer = store
204 .writer_with(path)
205 .concurrent(concurrency)
206 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
207 .await
208 .map(|w| w.into_futures_async_write().compat_write())
209 .context(WriteObjectSnafu { path })?;
210
211 let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
212 .context(WriteParquetSnafu { path })?;
213 let mut rows_written = 0;
214
215 while let Some(batch) = stream.next().await {
216 let batch = batch.context(error::ReadRecordBatchSnafu)?;
217 writer
218 .write(&batch)
219 .await
220 .context(WriteParquetSnafu { path })?;
221 rows_written += batch.num_rows();
222 }
223 writer.close().await.context(WriteParquetSnafu { path })?;
224 Ok(rows_written)
225}
226
227fn column_wise_config(
229 mut props: WriterPropertiesBuilder,
230 schema: SchemaRef,
231) -> WriterPropertiesBuilder {
232 for col in schema.column_schemas() {
235 if col.data_type.is_timestamp() {
236 let path = ColumnPath::new(vec![col.name.clone()]);
237 props = props
238 .set_column_dictionary_enabled(path.clone(), false)
239 .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
240 }
241 }
242 props
243}
244
245#[cfg(test)]
246mod tests {
247 use common_test_util::find_workspace_path;
248
249 use super::*;
250 use crate::test_util::{format_schema, test_store};
251
252 fn test_data_root() -> String {
253 find_workspace_path("/src/common/datasource/tests/parquet")
254 .display()
255 .to_string()
256 }
257
258 #[tokio::test]
259 async fn infer_schema_basic() {
260 let json = ParquetFormat::default();
261 let store = test_store(&test_data_root());
262 let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
263 let formatted: Vec<_> = format_schema(schema);
264
265 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
266 }
267}