common_datasource/file_format/
parquet.rs1use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::ParquetMetaData;
27use datafusion::parquet::format::FileMetaData;
28use datafusion::physical_plan::SendableRecordBatchStream;
29use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
30use datatypes::schema::SchemaRef;
31use futures::StreamExt;
32use futures::future::BoxFuture;
33use object_store::{FuturesAsyncReader, ObjectStore};
34use parquet::arrow::AsyncArrowWriter;
35use parquet::arrow::arrow_reader::ArrowReaderOptions;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
38use parquet::schema::types::ColumnPath;
39use snafu::ResultExt;
40use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
41
42use crate::DEFAULT_WRITE_BUFFER_SIZE;
43use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
44use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
45use crate::file_format::FileFormat;
46use crate::share_buffer::SharedBuffer;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
49pub struct ParquetFormat {}
50
51#[async_trait]
52impl FileFormat for ParquetFormat {
53 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
54 let meta = store
55 .stat(path)
56 .await
57 .context(error::ReadObjectSnafu { path })?;
58
59 let mut reader = store
60 .reader(path)
61 .await
62 .context(error::ReadObjectSnafu { path })?
63 .into_futures_async_read(0..meta.content_length())
64 .await
65 .context(error::ReadObjectSnafu { path })?
66 .compat();
67
68 let metadata = reader
69 .get_metadata(None)
70 .await
71 .context(error::ReadParquetSnafuSnafu)?;
72
73 let file_metadata = metadata.file_metadata();
74 let schema = parquet_to_arrow_schema(
75 file_metadata.schema_descr(),
76 file_metadata.key_value_metadata(),
77 )
78 .context(error::ParquetToSchemaSnafu)?;
79
80 Ok(schema)
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct DefaultParquetFileReaderFactory {
86 object_store: ObjectStore,
87}
88
89impl DefaultParquetFileReaderFactory {
91 pub fn new(object_store: ObjectStore) -> Self {
92 Self { object_store }
93 }
94}
95
96impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
97 fn create_reader(
101 &self,
102 _partition_index: usize,
103 file_meta: FileMeta,
104 _metadata_size_hint: Option<usize>,
105 _metrics: &ExecutionPlanMetricsSet,
106 ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
107 let path = file_meta.location().to_string();
108 let object_store = self.object_store.clone();
109
110 Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
111 }
112}
113
114pub struct LazyParquetFileReader {
115 object_store: ObjectStore,
116 reader: Option<Compat<FuturesAsyncReader>>,
117 path: String,
118}
119
120impl LazyParquetFileReader {
121 pub fn new(object_store: ObjectStore, path: String) -> Self {
122 LazyParquetFileReader {
123 object_store,
124 path,
125 reader: None,
126 }
127 }
128
129 async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
131 if self.reader.is_none() {
132 let meta = self.object_store.stat(&self.path).await?;
133 let reader = self
134 .object_store
135 .reader(&self.path)
136 .await?
137 .into_futures_async_read(0..meta.content_length())
138 .await?
139 .compat();
140 self.reader = Some(reader);
141 }
142
143 Ok(())
144 }
145}
146
147impl AsyncFileReader for LazyParquetFileReader {
148 fn get_bytes(
149 &mut self,
150 range: std::ops::Range<u64>,
151 ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
152 Box::pin(async move {
153 self.maybe_initialize()
154 .await
155 .map_err(|e| ParquetError::External(Box::new(e)))?;
156 self.reader.as_mut().unwrap().get_bytes(range).await
158 })
159 }
160
161 fn get_metadata<'a>(
162 &'a mut self,
163 options: Option<&'a ArrowReaderOptions>,
164 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
165 Box::pin(async move {
166 self.maybe_initialize()
167 .await
168 .map_err(|e| ParquetError::External(Box::new(e)))?;
169 self.reader.as_mut().unwrap().get_metadata(options).await
171 })
172 }
173}
174
175impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
176 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
177 self.write(batch).context(error::EncodeRecordBatchSnafu)
178 }
179}
180
181#[async_trait]
182impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
183 async fn close(self) -> Result<FileMetaData> {
184 self.close().context(error::EncodeRecordBatchSnafu)
185 }
186}
187
188pub async fn stream_to_parquet(
192 mut stream: SendableRecordBatchStream,
193 schema: datatypes::schema::SchemaRef,
194 store: ObjectStore,
195 path: &str,
196 concurrency: usize,
197) -> Result<usize> {
198 let write_props = column_wise_config(
199 WriterProperties::builder()
200 .set_compression(Compression::ZSTD(ZstdLevel::default()))
201 .set_statistics_truncate_length(None)
202 .set_column_index_truncate_length(None),
203 schema,
204 )
205 .build();
206 let inner_writer = store
207 .writer_with(path)
208 .concurrent(concurrency)
209 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
210 .await
211 .map(|w| w.into_futures_async_write().compat_write())
212 .context(WriteObjectSnafu { path })?;
213
214 let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
215 .context(WriteParquetSnafu { path })?;
216 let mut rows_written = 0;
217
218 while let Some(batch) = stream.next().await {
219 let batch = batch.context(error::ReadRecordBatchSnafu)?;
220 writer
221 .write(&batch)
222 .await
223 .context(WriteParquetSnafu { path })?;
224 rows_written += batch.num_rows();
225 }
226 writer.close().await.context(WriteParquetSnafu { path })?;
227 Ok(rows_written)
228}
229
230fn column_wise_config(
232 mut props: WriterPropertiesBuilder,
233 schema: SchemaRef,
234) -> WriterPropertiesBuilder {
235 for col in schema.column_schemas() {
238 if col.data_type.is_timestamp() {
239 let path = ColumnPath::new(vec![col.name.clone()]);
240 props = props
241 .set_column_dictionary_enabled(path.clone(), false)
242 .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
243 }
244 }
245 props
246}
247
248#[cfg(test)]
249mod tests {
250 use common_test_util::find_workspace_path;
251
252 use super::*;
253 use crate::test_util::{format_schema, test_store};
254
255 fn test_data_root() -> String {
256 find_workspace_path("/src/common/datasource/tests/parquet")
257 .display()
258 .to_string()
259 }
260
261 #[tokio::test]
262 async fn infer_schema_basic() {
263 let json = ParquetFormat::default();
264 let store = test_store(&test_data_root());
265 let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
266 let formatted: Vec<_> = format_schema(schema);
267
268 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
269 }
270}