common_datasource/file_format/
parquet.rs1use std::result;
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use arrow_schema::Schema;
20use async_trait::async_trait;
21use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
22use datafusion::error::Result as DatafusionResult;
23use datafusion::parquet::arrow::async_reader::AsyncFileReader;
24use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
25use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
26use datafusion::parquet::file::metadata::{
27 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
28};
29use datafusion::physical_plan::SendableRecordBatchStream;
30use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
31use datafusion_datasource::PartitionedFile;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use futures::future::BoxFuture;
35use object_store::{FuturesAsyncReader, ObjectStore};
36use parquet::arrow::AsyncArrowWriter;
37use parquet::arrow::arrow_reader::ArrowReaderOptions;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
40use parquet::schema::types::ColumnPath;
41use snafu::ResultExt;
42use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
43
44use crate::DEFAULT_WRITE_BUFFER_SIZE;
45use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
46use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
47use crate::file_format::FileFormat;
48use crate::share_buffer::SharedBuffer;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ParquetFormat {}
52
53#[async_trait]
54impl FileFormat for ParquetFormat {
55 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
56 let meta = store
57 .stat(path)
58 .await
59 .context(error::ReadObjectSnafu { path })?;
60
61 let mut reader = store
62 .reader(path)
63 .await
64 .context(error::ReadObjectSnafu { path })?
65 .into_futures_async_read(0..meta.content_length())
66 .await
67 .context(error::ReadObjectSnafu { path })?
68 .compat();
69
70 let metadata = reader
71 .get_metadata(None)
72 .await
73 .context(error::ReadParquetSnafuSnafu)?;
74
75 let file_metadata = metadata.file_metadata();
76 let schema = parquet_to_arrow_schema(
77 file_metadata.schema_descr(),
78 file_metadata.key_value_metadata(),
79 )
80 .context(error::ParquetToSchemaSnafu)?;
81
82 Ok(schema)
83 }
84}
85
86#[derive(Debug, Clone)]
87pub struct DefaultParquetFileReaderFactory {
88 object_store: ObjectStore,
89}
90
91impl DefaultParquetFileReaderFactory {
93 pub fn new(object_store: ObjectStore) -> Self {
94 Self { object_store }
95 }
96}
97
98impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
99 fn create_reader(
100 &self,
101 _partition_index: usize,
102 partitioned_file: PartitionedFile,
103 metadata_size_hint: Option<usize>,
104 _metrics: &ExecutionPlanMetricsSet,
105 ) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
106 let path = partitioned_file.path().to_string();
107 let object_store = self.object_store.clone();
108
109 Ok(Box::new(LazyParquetFileReader::new(
110 object_store,
111 path,
112 metadata_size_hint,
113 )))
114 }
115}
116
117pub struct LazyParquetFileReader {
118 object_store: ObjectStore,
119 reader: Option<Compat<FuturesAsyncReader>>,
120 file_size: Option<u64>,
121 metadata_size_hint: Option<usize>,
122 path: String,
123}
124
125impl LazyParquetFileReader {
126 pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
127 LazyParquetFileReader {
128 object_store,
129 path,
130 reader: None,
131 file_size: None,
132 metadata_size_hint,
133 }
134 }
135
136 async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
138 if self.reader.is_none() {
139 let meta = self.object_store.stat(&self.path).await?;
140 self.file_size = Some(meta.content_length());
141 let reader = self
142 .object_store
143 .reader(&self.path)
144 .await?
145 .into_futures_async_read(0..meta.content_length())
146 .await?
147 .compat();
148 self.reader = Some(reader);
149 }
150
151 Ok(())
152 }
153}
154
155impl AsyncFileReader for LazyParquetFileReader {
156 fn get_bytes(
157 &mut self,
158 range: std::ops::Range<u64>,
159 ) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
160 Box::pin(async move {
161 self.maybe_initialize()
162 .await
163 .map_err(|e| ParquetError::External(Box::new(e)))?;
164 self.reader.as_mut().unwrap().get_bytes(range).await
166 })
167 }
168
169 fn get_metadata<'a>(
170 &'a mut self,
171 options: Option<&'a ArrowReaderOptions>,
172 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
173 Box::pin(async move {
174 self.maybe_initialize()
175 .await
176 .map_err(|e| ParquetError::External(Box::new(e)))?;
177
178 let metadata_opts = options.map(|o| o.metadata_options().clone());
179 let metadata_reader = ParquetMetaDataReader::new()
180 .with_metadata_options(metadata_opts)
181 .with_page_index_policy(PageIndexPolicy::from(
182 options.is_some_and(|o| o.page_index()),
183 ))
184 .with_prefetch_hint(self.metadata_size_hint);
185
186 let metadata = metadata_reader
187 .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
188 .await?;
189 Ok(Arc::new(metadata))
190 })
191 }
192}
193
194impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
195 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
196 self.write(batch).context(error::EncodeRecordBatchSnafu)
197 }
198}
199
200#[async_trait]
201impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
202 async fn close(self) -> Result<ParquetMetaData> {
203 self.close().context(error::EncodeRecordBatchSnafu)
204 }
205}
206
207pub async fn stream_to_parquet(
211 mut stream: SendableRecordBatchStream,
212 schema: datatypes::schema::SchemaRef,
213 store: ObjectStore,
214 path: &str,
215 concurrency: usize,
216) -> Result<usize> {
217 let write_props = column_wise_config(
218 WriterProperties::builder()
219 .set_compression(Compression::ZSTD(ZstdLevel::default()))
220 .set_statistics_truncate_length(None)
221 .set_column_index_truncate_length(None),
222 schema,
223 )
224 .build();
225 let inner_writer = store
226 .writer_with(path)
227 .concurrent(concurrency)
228 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
229 .await
230 .map(|w| w.into_futures_async_write().compat_write())
231 .context(WriteObjectSnafu { path })?;
232
233 let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
234 .context(WriteParquetSnafu { path })?;
235 let mut rows_written = 0;
236
237 while let Some(batch) = stream.next().await {
238 let batch = batch.context(error::ReadRecordBatchSnafu)?;
239 writer
240 .write(&batch)
241 .await
242 .context(WriteParquetSnafu { path })?;
243 rows_written += batch.num_rows();
244 }
245 writer.close().await.context(WriteParquetSnafu { path })?;
246 Ok(rows_written)
247}
248
249fn column_wise_config(
251 mut props: WriterPropertiesBuilder,
252 schema: SchemaRef,
253) -> WriterPropertiesBuilder {
254 for col in schema.column_schemas() {
257 if col.data_type.is_timestamp() {
258 let path = ColumnPath::new(vec![col.name.clone()]);
259 props = props
260 .set_column_dictionary_enabled(path.clone(), false)
261 .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
262 }
263 }
264 props
265}
266
267#[cfg(test)]
268mod tests {
269 use common_test_util::find_workspace_path;
270
271 use super::*;
272 use crate::test_util::{format_schema, test_store};
273
274 fn test_data_root() -> String {
275 find_workspace_path("/src/common/datasource/tests/parquet")
276 .display()
277 .to_string()
278 }
279
280 #[tokio::test]
281 async fn infer_schema_basic() {
282 let json = ParquetFormat::default();
283 let store = test_store(&test_data_root());
284 let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
285 let formatted: Vec<_> = format_schema(schema);
286
287 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
288 }
289}