common_datasource/file_format/
parquet.rs1use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::{
27 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
28};
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion_datasource::PartitionedFile;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use futures::future::BoxFuture;
35use object_store::{FuturesAsyncReader, ObjectStore};
36use parquet::arrow::AsyncArrowWriter;
37use parquet::arrow::arrow_reader::ArrowReaderOptions;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
40use parquet::schema::types::ColumnPath;
41use snafu::ResultExt;
42use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
43
44use crate::DEFAULT_WRITE_BUFFER_SIZE;
45use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
46use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
47use crate::file_format::FileFormat;
48use crate::share_buffer::SharedBuffer;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ParquetFormat {}
52
53#[async_trait]
54impl FileFormat for ParquetFormat {
55 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
56 let meta = store
57 .stat(path)
58 .await
59 .context(error::ReadObjectSnafu { path })?;
60
61 let mut reader = store
62 .reader(path)
63 .await
64 .context(error::ReadObjectSnafu { path })?
65 .into_futures_async_read(0..meta.content_length())
66 .await
67 .context(error::ReadObjectSnafu { path })?
68 .compat();
69
70 let metadata = reader
71 .get_metadata(None)
72 .await
73 .context(error::ReadParquetSnafuSnafu)?;
74
75 let file_metadata = metadata.file_metadata();
76 let schema = parquet_to_arrow_schema(
77 file_metadata.schema_descr(),
78 file_metadata.key_value_metadata(),
79 )
80 .context(error::ParquetToSchemaSnafu)?;
81
82 Ok(schema)
83 }
84}
85
86#[derive(Debug, Clone)]
87pub struct DefaultParquetFileReaderFactory {
88 object_store: ObjectStore,
89}
90
91impl DefaultParquetFileReaderFactory {
93 pub fn new(object_store: ObjectStore) -> Self {
94 Self { object_store }
95 }
96}
97
98impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
99 fn create_reader(
100 &self,
101 _partition_index: usize,
102 partitioned_file: PartitionedFile,
103 metadata_size_hint: Option<usize>,
104 _metrics: &ExecutionPlanMetricsSet,
105 ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106 let path = partitioned_file.path().to_string();
107 let object_store = self.object_store.clone();
108
109 Ok(Box::new(LazyParquetFileReader::new(
110 object_store,
111 path,
112 metadata_size_hint,
113 )))
114 }
115}
116
117pub struct LazyParquetFileReader {
118 object_store: ObjectStore,
119 reader: Option<Compat<FuturesAsyncReader>>,
120 file_size: Option<u64>,
121 metadata_size_hint: Option<usize>,
122 path: String,
123}
124
125impl LazyParquetFileReader {
126 pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
127 LazyParquetFileReader {
128 object_store,
129 path,
130 reader: None,
131 file_size: None,
132 metadata_size_hint,
133 }
134 }
135
136 async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
138 if self.reader.is_none() {
139 let meta = self.object_store.stat(&self.path).await?;
140 self.file_size = Some(meta.content_length());
141 let reader = self
142 .object_store
143 .reader(&self.path)
144 .await?
145 .into_futures_async_read(0..meta.content_length())
146 .await?
147 .compat();
148 self.reader = Some(reader);
149 }
150
151 Ok(())
152 }
153}
154
155impl AsyncFileReader for LazyParquetFileReader {
156 fn get_bytes(
157 &mut self,
158 range: std::ops::Range<u64>,
159 ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
160 Box::pin(async move {
161 self.maybe_initialize()
162 .await
163 .map_err(|e| ParquetError::External(Box::new(e)))?;
164 self.reader.as_mut().unwrap().get_bytes(range).await
166 })
167 }
168
169 fn get_metadata<'a>(
170 &'a mut self,
171 options: Option<&'a ArrowReaderOptions>,
172 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
173 Box::pin(async move {
174 self.maybe_initialize()
175 .await
176 .map_err(|e| ParquetError::External(Box::new(e)))?;
177
178 let metadata_opts = options.map(|o| o.metadata_options().clone());
179 let column_index_policy =
180 options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy());
181 let offset_index_policy =
182 options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy());
183 let metadata_reader = ParquetMetaDataReader::new()
184 .with_metadata_options(metadata_opts)
185 .with_column_index_policy(column_index_policy)
186 .with_offset_index_policy(offset_index_policy)
187 .with_prefetch_hint(self.metadata_size_hint);
188
189 let metadata = metadata_reader
190 .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
191 .await?;
192 Ok(Arc::new(metadata))
193 })
194 }
195}
196
197impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
198 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
199 self.write(batch).context(error::EncodeRecordBatchSnafu)
200 }
201}
202
203#[async_trait]
204impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
205 async fn close(self) -> Result<ParquetMetaData> {
206 self.close().context(error::EncodeRecordBatchSnafu)
207 }
208}
209
210pub async fn stream_to_parquet(
214 mut stream: SendableRecordBatchStream,
215 schema: datatypes::schema::SchemaRef,
216 store: ObjectStore,
217 path: &str,
218 concurrency: usize,
219) -> Result<usize> {
220 let write_props = column_wise_config(
221 WriterProperties::builder()
222 .set_compression(Compression::ZSTD(ZstdLevel::default()))
223 .set_statistics_truncate_length(None)
224 .set_column_index_truncate_length(None),
225 schema,
226 )
227 .build();
228 let inner_writer = store
229 .writer_with(path)
230 .concurrent(concurrency)
231 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
232 .await
233 .map(|w| w.into_futures_async_write().compat_write())
234 .context(WriteObjectSnafu { path })?;
235
236 let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
237 .context(WriteParquetSnafu { path })?;
238 let mut rows_written = 0;
239
240 while let Some(batch) = stream.next().await {
241 let batch = batch.context(error::ReadRecordBatchSnafu)?;
242 writer
243 .write(&batch)
244 .await
245 .context(WriteParquetSnafu { path })?;
246 rows_written += batch.num_rows();
247 }
248 writer.close().await.context(WriteParquetSnafu { path })?;
249 Ok(rows_written)
250}
251
252fn column_wise_config(
254 mut props: WriterPropertiesBuilder,
255 schema: SchemaRef,
256) -> WriterPropertiesBuilder {
257 for col in schema.column_schemas() {
260 if col.data_type.is_timestamp() {
261 let path = ColumnPath::new(vec![col.name.clone()]);
262 props = props
263 .set_column_dictionary_enabled(path.clone(), false)
264 .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
265 }
266 }
267 props
268}
269
270#[cfg(test)]
271mod tests {
272 use common_test_util::find_workspace_path;
273
274 use super::*;
275 use crate::test_util::{format_schema, test_store};
276
277 fn test_data_root() -> String {
278 find_workspace_path("/src/common/datasource/tests/parquet")
279 .display()
280 .to_string()
281 }
282
283 #[tokio::test]
284 async fn infer_schema_basic() {
285 let json = ParquetFormat::default();
286 let store = test_store(&test_data_root());
287 let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
288 let formatted: Vec<_> = format_schema(schema);
289
290 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
291 }
292}