common_datasource/
file_format.rs1pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use common_recordbatch::DfSendableRecordBatchStream;
34use datafusion::datasource::file_format::file_compression_type::FileCompressionType as DfCompressionType;
35use datafusion::datasource::listing::PartitionedFile;
36use datafusion::datasource::object_store::ObjectStoreUrl;
37use datafusion::datasource::physical_plan::{
38 FileGroup, FileOpenFuture, FileScanConfigBuilder, FileSource, FileStream,
39};
40use datafusion::error::{DataFusionError, Result as DataFusionResult};
41use datafusion::physical_plan::SendableRecordBatchStream;
42use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
43use datatypes::arrow::datatypes::SchemaRef;
44use futures::{StreamExt, TryStreamExt};
45use object_store::ObjectStore;
46use object_store_opendal::OpendalStore;
47use snafu::ResultExt;
48use tokio::io::AsyncWriteExt;
49use tokio_util::compat::FuturesAsyncWriteCompatExt;
50
51use self::csv::CsvFormat;
52use self::json::JsonFormat;
53use self::orc::OrcFormat;
54use self::parquet::ParquetFormat;
55use crate::DEFAULT_WRITE_BUFFER_SIZE;
56use crate::buffered_writer::DfRecordBatchEncoder;
57use crate::compressed_writer::{CompressedWriter, IntoCompressedWriter};
58use crate::compression::CompressionType;
59use crate::error::{self, Result};
60use crate::share_buffer::SharedBuffer;
61
62pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
63pub const FORMAT_DELIMITER: &str = "delimiter";
64pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
65pub const FORMAT_HAS_HEADER: &str = "has_header";
66pub const FORMAT_TYPE: &str = "format";
67pub const FILE_PATTERN: &str = "pattern";
68pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
69pub const TIME_FORMAT: &str = "time_format";
70pub const DATE_FORMAT: &str = "date_format";
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum Format {
74 Csv(CsvFormat),
75 Json(JsonFormat),
76 Parquet(ParquetFormat),
77 Orc(OrcFormat),
78}
79
80impl Format {
81 pub fn suffix(&self) -> &'static str {
82 match self {
83 Format::Csv(_) => ".csv",
84 Format::Json(_) => ".json",
85 Format::Parquet(_) => ".parquet",
86 &Format::Orc(_) => ".orc",
87 }
88 }
89}
90
91impl TryFrom<&HashMap<String, String>> for Format {
92 type Error = error::Error;
93
94 fn try_from(options: &HashMap<String, String>) -> Result<Self> {
95 let format = options
96 .get(FORMAT_TYPE)
97 .map(|format| format.to_ascii_uppercase())
98 .unwrap_or_else(|| "PARQUET".to_string());
99
100 match format.as_str() {
101 "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
102 "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
103 "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
104 "ORC" => Ok(Self::Orc(OrcFormat)),
105 _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
106 }
107 }
108}
109
110#[async_trait]
111pub trait FileFormat: Send + Sync + std::fmt::Debug {
112 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
113}
114
115pub trait ArrowDecoder: Send + 'static {
116 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
123
124 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
131}
132
133impl ArrowDecoder for arrow::csv::reader::Decoder {
134 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
135 self.decode(buf)
136 }
137
138 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
139 self.flush()
140 }
141}
142
143impl ArrowDecoder for arrow::json::reader::Decoder {
144 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
145 self.decode(buf)
146 }
147
148 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
149 self.flush()
150 }
151}
152
153pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
154 object_store: Arc<ObjectStore>,
155 path: String,
156 compression_type: CompressionType,
157 decoder_factory: F,
158) -> DataFusionResult<FileOpenFuture> {
159 let mut decoder = decoder_factory()?;
160 Ok(Box::pin(async move {
161 let reader = object_store
162 .reader(&path)
163 .await
164 .map_err(|e| DataFusionError::External(Box::new(e)))?
165 .into_bytes_stream(..)
166 .await
167 .map_err(|e| DataFusionError::External(Box::new(e)))?;
168
169 let mut upstream = compression_type.convert_stream(reader).fuse();
170
171 let mut buffered = Bytes::new();
172
173 let stream = futures::stream::poll_fn(move |cx| {
174 loop {
175 if buffered.is_empty()
176 && let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
177 {
178 buffered = result?;
179 }
180
181 let decoded = decoder.decode(buffered.as_ref())?;
182
183 if decoded == 0 {
184 break;
185 } else {
186 buffered.advance(decoded);
187 }
188 }
189
190 Poll::Ready(decoder.flush().transpose())
191 });
192
193 Ok(stream.map_err(Into::into).boxed())
194 }))
195}
196
197pub async fn infer_schemas(
198 store: &ObjectStore,
199 files: &[String],
200 file_format: &dyn FileFormat,
201) -> Result<ArrowSchema> {
202 let mut schemas = Vec::with_capacity(files.len());
203 for file in files {
204 schemas.push(file_format.infer_schema(store, file).await?)
205 }
206 ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
207}
208
209async fn write_to_compressed_writer(
213 compressed_writer: &mut CompressedWriter,
214 data: &[u8],
215) -> Result<()> {
216 if !data.is_empty() {
217 compressed_writer
218 .write_all(data)
219 .await
220 .context(error::AsyncWriteSnafu)?;
221 }
222 Ok(())
223}
224
225pub async fn stream_to_file<E>(
232 mut stream: SendableRecordBatchStream,
233 store: ObjectStore,
234 path: &str,
235 threshold: usize,
236 concurrency: usize,
237 compression_type: CompressionType,
238 encoder_factory: impl Fn(SharedBuffer) -> E,
239) -> Result<usize>
240where
241 E: DfRecordBatchEncoder,
242{
243 let writer = store
245 .writer_with(path)
246 .concurrent(concurrency)
247 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
248 .await
249 .with_context(|_| error::WriteObjectSnafu { path })?
250 .into_futures_async_write()
251 .compat_write();
252
253 let mut compressed_writer = writer.into_compressed_writer(compression_type);
255
256 let buffer = SharedBuffer::with_capacity(threshold);
258 let mut encoder = encoder_factory(buffer.clone());
259
260 let mut rows = 0;
261
262 while let Some(batch) = stream.next().await {
264 let batch = batch.context(error::ReadRecordBatchSnafu)?;
265
266 encoder.write(&batch)?;
268 rows += batch.num_rows();
269
270 loop {
271 let chunk = {
272 let mut buffer_guard = buffer.buffer.lock().unwrap();
273 if buffer_guard.len() < threshold {
274 break;
275 }
276 buffer_guard.split_to(threshold)
277 };
278 write_to_compressed_writer(&mut compressed_writer, &chunk).await?;
279 }
280 }
281
282 if rows != 0 {
285 let final_data = {
287 let mut buffer_guard = buffer.buffer.lock().unwrap();
288 buffer_guard.split()
289 };
290 write_to_compressed_writer(&mut compressed_writer, &final_data).await?;
291 }
292
293 compressed_writer.shutdown().await?;
295
296 Ok(rows)
297}
298
299pub async fn file_to_stream(
304 store: &ObjectStore,
305 filename: &str,
306 file_schema: SchemaRef,
307 file_source: Arc<dyn FileSource>,
308 projection: Option<Vec<usize>>,
309 compression_type: CompressionType,
310) -> Result<DfSendableRecordBatchStream> {
311 let df_compression: DfCompressionType = compression_type.into();
312 let config = FileScanConfigBuilder::new(
313 ObjectStoreUrl::local_filesystem(),
314 file_schema,
315 file_source.clone(),
316 )
317 .with_file_group(FileGroup::new(vec![PartitionedFile::new(
318 filename.to_string(),
319 0,
320 )]))
321 .with_projection(projection)
322 .with_file_compression_type(df_compression)
323 .build();
324
325 let store = Arc::new(OpendalStore::new(store.clone()));
326 let file_opener = file_source
327 .with_projection(&config)
328 .create_file_opener(store, &config, 0);
329 let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
330 .context(error::BuildFileStreamSnafu)?;
331
332 Ok(Box::pin(stream))
333}