common_datasource/
file_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use common_recordbatch::DfSendableRecordBatchStream;
34use datafusion::datasource::file_format::file_compression_type::FileCompressionType as DfCompressionType;
35use datafusion::datasource::listing::PartitionedFile;
36use datafusion::datasource::object_store::ObjectStoreUrl;
37use datafusion::datasource::physical_plan::{
38    FileGroup, FileOpenFuture, FileScanConfigBuilder, FileSource, FileStream,
39};
40use datafusion::error::{DataFusionError, Result as DataFusionResult};
41use datafusion::physical_plan::SendableRecordBatchStream;
42use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
43use futures::{StreamExt, TryStreamExt};
44use object_store::ObjectStore;
45use object_store_opendal::OpendalStore;
46use snafu::ResultExt;
47use tokio::io::AsyncWriteExt;
48use tokio_util::compat::FuturesAsyncWriteCompatExt;
49
50use self::csv::CsvFormat;
51use self::json::JsonFormat;
52use self::orc::OrcFormat;
53use self::parquet::ParquetFormat;
54use crate::DEFAULT_WRITE_BUFFER_SIZE;
55use crate::buffered_writer::DfRecordBatchEncoder;
56use crate::compressed_writer::{CompressedWriter, IntoCompressedWriter};
57use crate::compression::CompressionType;
58use crate::error::{self, Result};
59use crate::share_buffer::SharedBuffer;
60
61pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
62pub const FORMAT_DELIMITER: &str = "delimiter";
63pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
64pub const FORMAT_HAS_HEADER: &str = "has_header";
65pub const FORMAT_TYPE: &str = "format";
66pub const FILE_PATTERN: &str = "pattern";
67pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
68pub const TIME_FORMAT: &str = "time_format";
69pub const DATE_FORMAT: &str = "date_format";
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum Format {
73    Csv(CsvFormat),
74    Json(JsonFormat),
75    Parquet(ParquetFormat),
76    Orc(OrcFormat),
77}
78
79impl Format {
80    pub fn suffix(&self) -> &'static str {
81        match self {
82            Format::Csv(_) => ".csv",
83            Format::Json(_) => ".json",
84            Format::Parquet(_) => ".parquet",
85            &Format::Orc(_) => ".orc",
86        }
87    }
88}
89
90impl TryFrom<&HashMap<String, String>> for Format {
91    type Error = error::Error;
92
93    fn try_from(options: &HashMap<String, String>) -> Result<Self> {
94        let format = options
95            .get(FORMAT_TYPE)
96            .map(|format| format.to_ascii_uppercase())
97            .unwrap_or_else(|| "PARQUET".to_string());
98
99        match format.as_str() {
100            "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
101            "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
102            "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
103            "ORC" => Ok(Self::Orc(OrcFormat)),
104            _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
105        }
106    }
107}
108
109#[async_trait]
110pub trait FileFormat: Send + Sync + std::fmt::Debug {
111    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
112}
113
114pub trait ArrowDecoder: Send + 'static {
115    /// Decode records from `buf` returning the number of bytes read.
116    ///
117    /// This method returns `Ok(0)` once `batch_size` objects have been parsed since the
118    /// last call to [`Self::flush`], or `buf` is exhausted.
119    ///
120    /// Any remaining bytes should be included in the next call to [`Self::decode`].
121    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
122
123    /// Flushes the currently buffered data to a [`RecordBatch`].
124    ///
125    /// This should only be called after [`Self::decode`] has returned `Ok(0)`,
126    /// otherwise may return an error if part way through decoding a record
127    ///
128    /// Returns `Ok(None)` if no buffered data.
129    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
130}
131
132impl ArrowDecoder for arrow::csv::reader::Decoder {
133    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
134        self.decode(buf)
135    }
136
137    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
138        self.flush()
139    }
140}
141
142impl ArrowDecoder for arrow::json::reader::Decoder {
143    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
144        self.decode(buf)
145    }
146
147    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
148        self.flush()
149    }
150}
151
152pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
153    object_store: Arc<ObjectStore>,
154    path: String,
155    compression_type: CompressionType,
156    decoder_factory: F,
157) -> DataFusionResult<FileOpenFuture> {
158    let mut decoder = decoder_factory()?;
159    Ok(Box::pin(async move {
160        let reader = object_store
161            .reader(&path)
162            .await
163            .map_err(|e| DataFusionError::External(Box::new(e)))?
164            .into_bytes_stream(..)
165            .await
166            .map_err(|e| DataFusionError::External(Box::new(e)))?;
167
168        let mut upstream = compression_type.convert_stream(reader).fuse();
169
170        let mut buffered = Bytes::new();
171
172        let stream = futures::stream::poll_fn(move |cx| {
173            loop {
174                if buffered.is_empty()
175                    && let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
176                {
177                    buffered = result?;
178                }
179
180                let decoded = decoder.decode(buffered.as_ref())?;
181
182                if decoded == 0 {
183                    break;
184                } else {
185                    buffered.advance(decoded);
186                }
187            }
188
189            Poll::Ready(decoder.flush().transpose())
190        });
191
192        Ok(stream.map_err(Into::into).boxed())
193    }))
194}
195
196pub async fn infer_schemas(
197    store: &ObjectStore,
198    files: &[String],
199    file_format: &dyn FileFormat,
200) -> Result<ArrowSchema> {
201    let mut schemas = Vec::with_capacity(files.len());
202    for file in files {
203        schemas.push(file_format.infer_schema(store, file).await?)
204    }
205    ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
206}
207
208/// Writes data to a compressed writer if the data is not empty.
209///
210/// Does nothing if `data` is empty; otherwise writes all data and returns any error.
211async fn write_to_compressed_writer(
212    compressed_writer: &mut CompressedWriter,
213    data: &[u8],
214) -> Result<()> {
215    if !data.is_empty() {
216        compressed_writer
217            .write_all(data)
218            .await
219            .context(error::AsyncWriteSnafu)?;
220    }
221    Ok(())
222}
223
224/// Streams [SendableRecordBatchStream] to a file with optional compression support.
225/// Data is buffered and flushed according to the given `threshold`.
226/// Ensures that writer resources are cleanly released and that an empty file is not
227/// created if no rows are written.
228///
229/// Returns the total number of rows successfully written.
230pub async fn stream_to_file<E>(
231    mut stream: SendableRecordBatchStream,
232    store: ObjectStore,
233    path: &str,
234    threshold: usize,
235    concurrency: usize,
236    compression_type: CompressionType,
237    encoder_factory: impl Fn(SharedBuffer) -> E,
238) -> Result<usize>
239where
240    E: DfRecordBatchEncoder,
241{
242    // Create the file writer with OpenDAL's built-in buffering
243    let writer = store
244        .writer_with(path)
245        .concurrent(concurrency)
246        .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
247        .await
248        .with_context(|_| error::WriteObjectSnafu { path })?
249        .into_futures_async_write()
250        .compat_write();
251
252    // Apply compression if needed
253    let mut compressed_writer = writer.into_compressed_writer(compression_type);
254
255    // Create a buffer for the encoder
256    let buffer = SharedBuffer::with_capacity(threshold);
257    let mut encoder = encoder_factory(buffer.clone());
258
259    let mut rows = 0;
260
261    // Process each record batch
262    while let Some(batch) = stream.next().await {
263        let batch = batch.context(error::ReadRecordBatchSnafu)?;
264
265        // Write batch using the encoder
266        encoder.write(&batch)?;
267        rows += batch.num_rows();
268
269        loop {
270            let chunk = {
271                let mut buffer_guard = buffer.buffer.lock().unwrap();
272                if buffer_guard.len() < threshold {
273                    break;
274                }
275                buffer_guard.split_to(threshold)
276            };
277            write_to_compressed_writer(&mut compressed_writer, &chunk).await?;
278        }
279    }
280
281    // If no row's been written, just simply close the underlying writer
282    // without flush so that no file will be actually created.
283    if rows != 0 {
284        // Final flush of any remaining data
285        let final_data = {
286            let mut buffer_guard = buffer.buffer.lock().unwrap();
287            buffer_guard.split()
288        };
289        write_to_compressed_writer(&mut compressed_writer, &final_data).await?;
290    }
291
292    // Shutdown compression and close writer
293    compressed_writer.shutdown().await?;
294
295    Ok(rows)
296}
297
298/// Creates a [FileStream] for reading data from a file with optional column projection
299/// and compression support.
300///
301/// Returns [SendableRecordBatchStream].
302pub async fn file_to_stream(
303    store: &ObjectStore,
304    filename: &str,
305    file_source: Arc<dyn FileSource>,
306    projection: Option<Vec<usize>>,
307    compression_type: CompressionType,
308) -> Result<DfSendableRecordBatchStream> {
309    let df_compression: DfCompressionType = compression_type.into();
310    let config =
311        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
312            .with_file_group(FileGroup::new(vec![PartitionedFile::new(
313                filename.to_string(),
314                0,
315            )]))
316            .with_projection_indices(projection)
317            .with_file_compression_type(df_compression)
318            .build();
319
320    let store = Arc::new(OpendalStore::new(store.clone()));
321    let file_opener = file_source
322        .with_projection(&config)
323        .create_file_opener(store, &config, 0);
324    let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
325        .context(error::BuildFileStreamSnafu)?;
326
327    Ok(Box::pin(stream))
328}