common_datasource/
file_format.rs1pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use common_recordbatch::DfSendableRecordBatchStream;
34use datafusion::datasource::file_format::file_compression_type::FileCompressionType as DfCompressionType;
35use datafusion::datasource::listing::PartitionedFile;
36use datafusion::datasource::object_store::ObjectStoreUrl;
37use datafusion::datasource::physical_plan::{
38 FileGroup, FileOpenFuture, FileScanConfigBuilder, FileSource, FileStream,
39};
40use datafusion::error::{DataFusionError, Result as DataFusionResult};
41use datafusion::physical_plan::SendableRecordBatchStream;
42use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
43use futures::{StreamExt, TryStreamExt};
44use object_store::ObjectStore;
45use object_store_opendal::OpendalStore;
46use snafu::ResultExt;
47use tokio::io::AsyncWriteExt;
48use tokio_util::compat::FuturesAsyncWriteCompatExt;
49
50use self::csv::CsvFormat;
51use self::json::JsonFormat;
52use self::orc::OrcFormat;
53use self::parquet::ParquetFormat;
54use crate::DEFAULT_WRITE_BUFFER_SIZE;
55use crate::buffered_writer::DfRecordBatchEncoder;
56use crate::compressed_writer::{CompressedWriter, IntoCompressedWriter};
57use crate::compression::CompressionType;
58use crate::error::{self, Result};
59use crate::share_buffer::SharedBuffer;
60
61pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
62pub const FORMAT_DELIMITER: &str = "delimiter";
63pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
64pub const FORMAT_HAS_HEADER: &str = "has_header";
65pub const FORMAT_TYPE: &str = "format";
66pub const FILE_PATTERN: &str = "pattern";
67pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
68pub const TIME_FORMAT: &str = "time_format";
69pub const DATE_FORMAT: &str = "date_format";
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum Format {
73 Csv(CsvFormat),
74 Json(JsonFormat),
75 Parquet(ParquetFormat),
76 Orc(OrcFormat),
77}
78
79impl Format {
80 pub fn suffix(&self) -> &'static str {
81 match self {
82 Format::Csv(_) => ".csv",
83 Format::Json(_) => ".json",
84 Format::Parquet(_) => ".parquet",
85 &Format::Orc(_) => ".orc",
86 }
87 }
88}
89
90impl TryFrom<&HashMap<String, String>> for Format {
91 type Error = error::Error;
92
93 fn try_from(options: &HashMap<String, String>) -> Result<Self> {
94 let format = options
95 .get(FORMAT_TYPE)
96 .map(|format| format.to_ascii_uppercase())
97 .unwrap_or_else(|| "PARQUET".to_string());
98
99 match format.as_str() {
100 "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
101 "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
102 "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
103 "ORC" => Ok(Self::Orc(OrcFormat)),
104 _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
105 }
106 }
107}
108
109#[async_trait]
110pub trait FileFormat: Send + Sync + std::fmt::Debug {
111 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
112}
113
114pub trait ArrowDecoder: Send + 'static {
115 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
122
123 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
130}
131
132impl ArrowDecoder for arrow::csv::reader::Decoder {
133 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
134 self.decode(buf)
135 }
136
137 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
138 self.flush()
139 }
140}
141
142impl ArrowDecoder for arrow::json::reader::Decoder {
143 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
144 self.decode(buf)
145 }
146
147 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
148 self.flush()
149 }
150}
151
152pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
153 object_store: Arc<ObjectStore>,
154 path: String,
155 compression_type: CompressionType,
156 decoder_factory: F,
157) -> DataFusionResult<FileOpenFuture> {
158 let mut decoder = decoder_factory()?;
159 Ok(Box::pin(async move {
160 let reader = object_store
161 .reader(&path)
162 .await
163 .map_err(|e| DataFusionError::External(Box::new(e)))?
164 .into_bytes_stream(..)
165 .await
166 .map_err(|e| DataFusionError::External(Box::new(e)))?;
167
168 let mut upstream = compression_type.convert_stream(reader).fuse();
169
170 let mut buffered = Bytes::new();
171
172 let stream = futures::stream::poll_fn(move |cx| {
173 loop {
174 if buffered.is_empty()
175 && let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
176 {
177 buffered = result?;
178 }
179
180 let decoded = decoder.decode(buffered.as_ref())?;
181
182 if decoded == 0 {
183 break;
184 } else {
185 buffered.advance(decoded);
186 }
187 }
188
189 Poll::Ready(decoder.flush().transpose())
190 });
191
192 Ok(stream.map_err(Into::into).boxed())
193 }))
194}
195
196pub async fn infer_schemas(
197 store: &ObjectStore,
198 files: &[String],
199 file_format: &dyn FileFormat,
200) -> Result<ArrowSchema> {
201 let mut schemas = Vec::with_capacity(files.len());
202 for file in files {
203 schemas.push(file_format.infer_schema(store, file).await?)
204 }
205 ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
206}
207
208async fn write_to_compressed_writer(
212 compressed_writer: &mut CompressedWriter,
213 data: &[u8],
214) -> Result<()> {
215 if !data.is_empty() {
216 compressed_writer
217 .write_all(data)
218 .await
219 .context(error::AsyncWriteSnafu)?;
220 }
221 Ok(())
222}
223
224pub async fn stream_to_file<E>(
231 mut stream: SendableRecordBatchStream,
232 store: ObjectStore,
233 path: &str,
234 threshold: usize,
235 concurrency: usize,
236 compression_type: CompressionType,
237 encoder_factory: impl Fn(SharedBuffer) -> E,
238) -> Result<usize>
239where
240 E: DfRecordBatchEncoder,
241{
242 let writer = store
244 .writer_with(path)
245 .concurrent(concurrency)
246 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
247 .await
248 .with_context(|_| error::WriteObjectSnafu { path })?
249 .into_futures_async_write()
250 .compat_write();
251
252 let mut compressed_writer = writer.into_compressed_writer(compression_type);
254
255 let buffer = SharedBuffer::with_capacity(threshold);
257 let mut encoder = encoder_factory(buffer.clone());
258
259 let mut rows = 0;
260
261 while let Some(batch) = stream.next().await {
263 let batch = batch.context(error::ReadRecordBatchSnafu)?;
264
265 encoder.write(&batch)?;
267 rows += batch.num_rows();
268
269 loop {
270 let chunk = {
271 let mut buffer_guard = buffer.buffer.lock().unwrap();
272 if buffer_guard.len() < threshold {
273 break;
274 }
275 buffer_guard.split_to(threshold)
276 };
277 write_to_compressed_writer(&mut compressed_writer, &chunk).await?;
278 }
279 }
280
281 if rows != 0 {
284 let final_data = {
286 let mut buffer_guard = buffer.buffer.lock().unwrap();
287 buffer_guard.split()
288 };
289 write_to_compressed_writer(&mut compressed_writer, &final_data).await?;
290 }
291
292 compressed_writer.shutdown().await?;
294
295 Ok(rows)
296}
297
298pub async fn file_to_stream(
303 store: &ObjectStore,
304 filename: &str,
305 file_source: Arc<dyn FileSource>,
306 projection: Option<Vec<usize>>,
307 compression_type: CompressionType,
308) -> Result<DfSendableRecordBatchStream> {
309 let df_compression: DfCompressionType = compression_type.into();
310 let config =
311 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
312 .with_file_group(FileGroup::new(vec![PartitionedFile::new(
313 filename.to_string(),
314 0,
315 )]))
316 .with_projection_indices(projection)
317 .with_file_compression_type(df_compression)
318 .build();
319
320 let store = Arc::new(OpendalStore::new(store.clone()));
321 let file_opener = file_source
322 .with_projection(&config)
323 .create_file_opener(store, &config, 0);
324 let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
325 .context(error::BuildFileStreamSnafu)?;
326
327 Ok(Box::pin(stream))
328}