common_datasource/
file_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use datafusion::datasource::physical_plan::FileOpenFuture;
34use datafusion::error::{DataFusionError, Result as DataFusionResult};
35use datafusion::physical_plan::SendableRecordBatchStream;
36use futures::StreamExt;
37use object_store::ObjectStore;
38use snafu::ResultExt;
39use tokio_util::compat::FuturesAsyncWriteCompatExt;
40
41use self::csv::CsvFormat;
42use self::json::JsonFormat;
43use self::orc::OrcFormat;
44use self::parquet::ParquetFormat;
45use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
46use crate::compression::CompressionType;
47use crate::error::{self, Result};
48use crate::share_buffer::SharedBuffer;
49use crate::DEFAULT_WRITE_BUFFER_SIZE;
50
51pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
52pub const FORMAT_DELIMITER: &str = "delimiter";
53pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
54pub const FORMAT_HAS_HEADER: &str = "has_header";
55pub const FORMAT_TYPE: &str = "format";
56pub const FILE_PATTERN: &str = "pattern";
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum Format {
60    Csv(CsvFormat),
61    Json(JsonFormat),
62    Parquet(ParquetFormat),
63    Orc(OrcFormat),
64}
65
66impl Format {
67    pub fn suffix(&self) -> &'static str {
68        match self {
69            Format::Csv(_) => ".csv",
70            Format::Json(_) => ".json",
71            Format::Parquet(_) => ".parquet",
72            &Format::Orc(_) => ".orc",
73        }
74    }
75}
76
77impl TryFrom<&HashMap<String, String>> for Format {
78    type Error = error::Error;
79
80    fn try_from(options: &HashMap<String, String>) -> Result<Self> {
81        let format = options
82            .get(FORMAT_TYPE)
83            .map(|format| format.to_ascii_uppercase())
84            .unwrap_or_else(|| "PARQUET".to_string());
85
86        match format.as_str() {
87            "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
88            "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
89            "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
90            "ORC" => Ok(Self::Orc(OrcFormat)),
91            _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
92        }
93    }
94}
95
96#[async_trait]
97pub trait FileFormat: Send + Sync + std::fmt::Debug {
98    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
99}
100
101pub trait ArrowDecoder: Send + 'static {
102    /// Decode records from `buf` returning the number of bytes read.
103    ///
104    /// This method returns `Ok(0)` once `batch_size` objects have been parsed since the
105    /// last call to [`Self::flush`], or `buf` is exhausted.
106    ///
107    /// Any remaining bytes should be included in the next call to [`Self::decode`].
108    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
109
110    /// Flushes the currently buffered data to a [`RecordBatch`].
111    ///
112    /// This should only be called after [`Self::decode`] has returned `Ok(0)`,
113    /// otherwise may return an error if part way through decoding a record
114    ///
115    /// Returns `Ok(None)` if no buffered data.
116    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
117}
118
119impl ArrowDecoder for arrow::csv::reader::Decoder {
120    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
121        self.decode(buf)
122    }
123
124    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
125        self.flush()
126    }
127}
128
129impl ArrowDecoder for arrow::json::reader::Decoder {
130    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
131        self.decode(buf)
132    }
133
134    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
135        self.flush()
136    }
137}
138
139pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
140    object_store: Arc<ObjectStore>,
141    path: String,
142    compression_type: CompressionType,
143    decoder_factory: F,
144) -> DataFusionResult<FileOpenFuture> {
145    let mut decoder = decoder_factory()?;
146    Ok(Box::pin(async move {
147        let reader = object_store
148            .reader(&path)
149            .await
150            .map_err(|e| DataFusionError::External(Box::new(e)))?
151            .into_bytes_stream(..)
152            .await
153            .map_err(|e| DataFusionError::External(Box::new(e)))?;
154
155        let mut upstream = compression_type.convert_stream(reader).fuse();
156
157        let mut buffered = Bytes::new();
158
159        let stream = futures::stream::poll_fn(move |cx| {
160            loop {
161                if buffered.is_empty() {
162                    if let Some(result) = futures::ready!(upstream.poll_next_unpin(cx)) {
163                        buffered = result?;
164                    };
165                }
166
167                let decoded = decoder.decode(buffered.as_ref())?;
168
169                if decoded == 0 {
170                    break;
171                } else {
172                    buffered.advance(decoded);
173                }
174            }
175
176            Poll::Ready(decoder.flush().transpose())
177        });
178
179        Ok(stream.boxed())
180    }))
181}
182
183pub async fn infer_schemas(
184    store: &ObjectStore,
185    files: &[String],
186    file_format: &dyn FileFormat,
187) -> Result<ArrowSchema> {
188    let mut schemas = Vec::with_capacity(files.len());
189    for file in files {
190        schemas.push(file_format.infer_schema(store, file).await?)
191    }
192    ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
193}
194
195pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
196    mut stream: SendableRecordBatchStream,
197    store: ObjectStore,
198    path: &str,
199    threshold: usize,
200    concurrency: usize,
201    encoder_factory: U,
202) -> Result<usize> {
203    let buffer = SharedBuffer::with_capacity(threshold);
204    let encoder = encoder_factory(buffer.clone());
205    let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
206        store
207            .writer_with(&path)
208            .concurrent(concurrency)
209            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
210            .await
211            .map(|v| v.into_futures_async_write().compat_write())
212            .context(error::WriteObjectSnafu { path })
213    });
214
215    let mut rows = 0;
216
217    while let Some(batch) = stream.next().await {
218        let batch = batch.context(error::ReadRecordBatchSnafu)?;
219        writer.write(&batch).await?;
220        rows += batch.num_rows();
221    }
222    writer.close_inner_writer().await?;
223    Ok(rows)
224}