common_datasource/
file_format.rs1pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use datafusion::datasource::physical_plan::FileOpenFuture;
34use datafusion::error::{DataFusionError, Result as DataFusionResult};
35use datafusion::physical_plan::SendableRecordBatchStream;
36use futures::StreamExt;
37use object_store::ObjectStore;
38use snafu::ResultExt;
39use tokio_util::compat::FuturesAsyncWriteCompatExt;
40
41use self::csv::CsvFormat;
42use self::json::JsonFormat;
43use self::orc::OrcFormat;
44use self::parquet::ParquetFormat;
45use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
46use crate::compression::CompressionType;
47use crate::error::{self, Result};
48use crate::share_buffer::SharedBuffer;
49use crate::DEFAULT_WRITE_BUFFER_SIZE;
50
51pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
52pub const FORMAT_DELIMITER: &str = "delimiter";
53pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
54pub const FORMAT_HAS_HEADER: &str = "has_header";
55pub const FORMAT_TYPE: &str = "format";
56pub const FILE_PATTERN: &str = "pattern";
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum Format {
60 Csv(CsvFormat),
61 Json(JsonFormat),
62 Parquet(ParquetFormat),
63 Orc(OrcFormat),
64}
65
66impl Format {
67 pub fn suffix(&self) -> &'static str {
68 match self {
69 Format::Csv(_) => ".csv",
70 Format::Json(_) => ".json",
71 Format::Parquet(_) => ".parquet",
72 &Format::Orc(_) => ".orc",
73 }
74 }
75}
76
77impl TryFrom<&HashMap<String, String>> for Format {
78 type Error = error::Error;
79
80 fn try_from(options: &HashMap<String, String>) -> Result<Self> {
81 let format = options
82 .get(FORMAT_TYPE)
83 .map(|format| format.to_ascii_uppercase())
84 .unwrap_or_else(|| "PARQUET".to_string());
85
86 match format.as_str() {
87 "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
88 "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
89 "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
90 "ORC" => Ok(Self::Orc(OrcFormat)),
91 _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
92 }
93 }
94}
95
96#[async_trait]
97pub trait FileFormat: Send + Sync + std::fmt::Debug {
98 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
99}
100
101pub trait ArrowDecoder: Send + 'static {
102 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
109
110 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
117}
118
119impl ArrowDecoder for arrow::csv::reader::Decoder {
120 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
121 self.decode(buf)
122 }
123
124 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
125 self.flush()
126 }
127}
128
129impl ArrowDecoder for arrow::json::reader::Decoder {
130 fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
131 self.decode(buf)
132 }
133
134 fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
135 self.flush()
136 }
137}
138
139pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
140 object_store: Arc<ObjectStore>,
141 path: String,
142 compression_type: CompressionType,
143 decoder_factory: F,
144) -> DataFusionResult<FileOpenFuture> {
145 let mut decoder = decoder_factory()?;
146 Ok(Box::pin(async move {
147 let reader = object_store
148 .reader(&path)
149 .await
150 .map_err(|e| DataFusionError::External(Box::new(e)))?
151 .into_bytes_stream(..)
152 .await
153 .map_err(|e| DataFusionError::External(Box::new(e)))?;
154
155 let mut upstream = compression_type.convert_stream(reader).fuse();
156
157 let mut buffered = Bytes::new();
158
159 let stream = futures::stream::poll_fn(move |cx| {
160 loop {
161 if buffered.is_empty() {
162 if let Some(result) = futures::ready!(upstream.poll_next_unpin(cx)) {
163 buffered = result?;
164 };
165 }
166
167 let decoded = decoder.decode(buffered.as_ref())?;
168
169 if decoded == 0 {
170 break;
171 } else {
172 buffered.advance(decoded);
173 }
174 }
175
176 Poll::Ready(decoder.flush().transpose())
177 });
178
179 Ok(stream.boxed())
180 }))
181}
182
183pub async fn infer_schemas(
184 store: &ObjectStore,
185 files: &[String],
186 file_format: &dyn FileFormat,
187) -> Result<ArrowSchema> {
188 let mut schemas = Vec::with_capacity(files.len());
189 for file in files {
190 schemas.push(file_format.infer_schema(store, file).await?)
191 }
192 ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
193}
194
195pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
196 mut stream: SendableRecordBatchStream,
197 store: ObjectStore,
198 path: &str,
199 threshold: usize,
200 concurrency: usize,
201 encoder_factory: U,
202) -> Result<usize> {
203 let buffer = SharedBuffer::with_capacity(threshold);
204 let encoder = encoder_factory(buffer.clone());
205 let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
206 store
207 .writer_with(&path)
208 .concurrent(concurrency)
209 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
210 .await
211 .map(|v| v.into_futures_async_write().compat_write())
212 .context(error::WriteObjectSnafu { path })
213 });
214
215 let mut rows = 0;
216
217 while let Some(batch) = stream.next().await {
218 let batch = batch.context(error::ReadRecordBatchSnafu)?;
219 writer.write(&batch).await?;
220 rows += batch.num_rows();
221 }
222 writer.close_inner_writer().await?;
223 Ok(rows)
224}