common_datasource/
file_format.rs1pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use datafusion::datasource::physical_plan::FileOpenFuture;
34use datafusion::error::{DataFusionError, Result as DataFusionResult};
35use datafusion::physical_plan::SendableRecordBatchStream;
36use futures::StreamExt;
37use object_store::ObjectStore;
38use snafu::ResultExt;
39use tokio_util::compat::FuturesAsyncWriteCompatExt;
40
41use self::csv::CsvFormat;
42use self::json::JsonFormat;
43use self::orc::OrcFormat;
44use self::parquet::ParquetFormat;
45use crate::DEFAULT_WRITE_BUFFER_SIZE;
46use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
47use crate::compression::CompressionType;
48use crate::error::{self, Result};
49use crate::share_buffer::SharedBuffer;
50
51pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
52pub const FORMAT_DELIMITER: &str = "delimiter";
53pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
54pub const FORMAT_HAS_HEADER: &str = "has_header";
55pub const FORMAT_TYPE: &str = "format";
56pub const FILE_PATTERN: &str = "pattern";
57pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
58pub const TIME_FORMAT: &str = "time_format";
59pub const DATE_FORMAT: &str = "date_format";
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum Format {
63 Csv(CsvFormat),
64 Json(JsonFormat),
65 Parquet(ParquetFormat),
66 Orc(OrcFormat),
67}
68
69impl Format {
70 pub fn suffix(&self) -> &'static str {
71 match self {
72 Format::Csv(_) => ".csv",
73 Format::Json(_) => ".json",
74 Format::Parquet(_) => ".parquet",
75 &Format::Orc(_) => ".orc",
76 }
77 }
78}
79
80impl TryFrom<&HashMap<String, String>> for Format {
81 type Error = error::Error;
82
83 fn try_from(options: &HashMap<String, String>) -> Result<Self> {
84 let format = options
85 .get(FORMAT_TYPE)
86 .map(|format| format.to_ascii_uppercase())
87 .unwrap_or_else(|| "PARQUET".to_string());
88
89 match format.as_str() {
90 "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
91 "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
92 "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
93 "ORC" => Ok(Self::Orc(OrcFormat)),
94 _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
95 }
96 }
97}
98
99#[async_trait]
100pub trait FileFormat: Send + Sync + std::fmt::Debug {
101 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
102}
103
104pub trait ArrowDecoder: Send + 'static {
105 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
112
113 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
120}
121
122impl ArrowDecoder for arrow::csv::reader::Decoder {
123 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
124 self.decode(buf)
125 }
126
127 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
128 self.flush()
129 }
130}
131
132impl ArrowDecoder for arrow::json::reader::Decoder {
133 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
134 self.decode(buf)
135 }
136
137 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
138 self.flush()
139 }
140}
141
142pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
143 object_store: Arc<ObjectStore>,
144 path: String,
145 compression_type: CompressionType,
146 decoder_factory: F,
147) -> DataFusionResult<FileOpenFuture> {
148 let mut decoder = decoder_factory()?;
149 Ok(Box::pin(async move {
150 let reader = object_store
151 .reader(&path)
152 .await
153 .map_err(|e| DataFusionError::External(Box::new(e)))?
154 .into_bytes_stream(..)
155 .await
156 .map_err(|e| DataFusionError::External(Box::new(e)))?;
157
158 let mut upstream = compression_type.convert_stream(reader).fuse();
159
160 let mut buffered = Bytes::new();
161
162 let stream = futures::stream::poll_fn(move |cx| {
163 loop {
164 if buffered.is_empty()
165 && let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
166 {
167 buffered = result?;
168 }
169
170 let decoded = decoder.decode(buffered.as_ref())?;
171
172 if decoded == 0 {
173 break;
174 } else {
175 buffered.advance(decoded);
176 }
177 }
178
179 Poll::Ready(decoder.flush().transpose())
180 });
181
182 Ok(stream.boxed())
183 }))
184}
185
186pub async fn infer_schemas(
187 store: &ObjectStore,
188 files: &[String],
189 file_format: &dyn FileFormat,
190) -> Result<ArrowSchema> {
191 let mut schemas = Vec::with_capacity(files.len());
192 for file in files {
193 schemas.push(file_format.infer_schema(store, file).await?)
194 }
195 ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
196}
197
198pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
199 mut stream: SendableRecordBatchStream,
200 store: ObjectStore,
201 path: &str,
202 threshold: usize,
203 concurrency: usize,
204 encoder_factory: U,
205) -> Result<usize> {
206 let buffer = SharedBuffer::with_capacity(threshold);
207 let encoder = encoder_factory(buffer.clone());
208 let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
209 store
210 .writer_with(&path)
211 .concurrent(concurrency)
212 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
213 .await
214 .map(|v| v.into_futures_async_write().compat_write())
215 .context(error::WriteObjectSnafu { path })
216 });
217
218 let mut rows = 0;
219
220 while let Some(batch) = stream.next().await {
221 let batch = batch.context(error::ReadRecordBatchSnafu)?;
222 writer.write(&batch).await?;
223 rows += batch.num_rows();
224 }
225 writer.close_inner_writer().await?;
226 Ok(rows)
227}