common_datasource/
file_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod csv;
16pub mod json;
17pub mod orc;
18pub mod parquet;
19#[cfg(test)]
20pub mod tests;
21
22pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
23
24use std::collections::HashMap;
25use std::result;
26use std::sync::Arc;
27use std::task::Poll;
28
29use arrow::record_batch::RecordBatch;
30use arrow_schema::{ArrowError, Schema as ArrowSchema};
31use async_trait::async_trait;
32use bytes::{Buf, Bytes};
33use datafusion::datasource::physical_plan::FileOpenFuture;
34use datafusion::error::{DataFusionError, Result as DataFusionResult};
35use datafusion::physical_plan::SendableRecordBatchStream;
36use futures::StreamExt;
37use object_store::ObjectStore;
38use snafu::ResultExt;
39use tokio_util::compat::FuturesAsyncWriteCompatExt;
40
41use self::csv::CsvFormat;
42use self::json::JsonFormat;
43use self::orc::OrcFormat;
44use self::parquet::ParquetFormat;
45use crate::DEFAULT_WRITE_BUFFER_SIZE;
46use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
47use crate::compression::CompressionType;
48use crate::error::{self, Result};
49use crate::share_buffer::SharedBuffer;
50
51pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
52pub const FORMAT_DELIMITER: &str = "delimiter";
53pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
54pub const FORMAT_HAS_HEADER: &str = "has_header";
55pub const FORMAT_TYPE: &str = "format";
56pub const FILE_PATTERN: &str = "pattern";
57pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
58pub const TIME_FORMAT: &str = "time_format";
59pub const DATE_FORMAT: &str = "date_format";
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum Format {
63    Csv(CsvFormat),
64    Json(JsonFormat),
65    Parquet(ParquetFormat),
66    Orc(OrcFormat),
67}
68
69impl Format {
70    pub fn suffix(&self) -> &'static str {
71        match self {
72            Format::Csv(_) => ".csv",
73            Format::Json(_) => ".json",
74            Format::Parquet(_) => ".parquet",
75            &Format::Orc(_) => ".orc",
76        }
77    }
78}
79
80impl TryFrom<&HashMap<String, String>> for Format {
81    type Error = error::Error;
82
83    fn try_from(options: &HashMap<String, String>) -> Result<Self> {
84        let format = options
85            .get(FORMAT_TYPE)
86            .map(|format| format.to_ascii_uppercase())
87            .unwrap_or_else(|| "PARQUET".to_string());
88
89        match format.as_str() {
90            "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
91            "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
92            "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
93            "ORC" => Ok(Self::Orc(OrcFormat)),
94            _ => error::UnsupportedFormatSnafu { format: &format }.fail(),
95        }
96    }
97}
98
99#[async_trait]
100pub trait FileFormat: Send + Sync + std::fmt::Debug {
101    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
102}
103
104pub trait ArrowDecoder: Send + 'static {
105    /// Decode records from `buf` returning the number of bytes read.
106    ///
107    /// This method returns `Ok(0)` once `batch_size` objects have been parsed since the
108    /// last call to [`Self::flush`], or `buf` is exhausted.
109    ///
110    /// Any remaining bytes should be included in the next call to [`Self::decode`].
111    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
112
113    /// Flushes the currently buffered data to a [`RecordBatch`].
114    ///
115    /// This should only be called after [`Self::decode`] has returned `Ok(0)`,
116    /// otherwise may return an error if part way through decoding a record
117    ///
118    /// Returns `Ok(None)` if no buffered data.
119    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
120}
121
122impl ArrowDecoder for arrow::csv::reader::Decoder {
123    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
124        self.decode(buf)
125    }
126
127    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
128        self.flush()
129    }
130}
131
132impl ArrowDecoder for arrow::json::reader::Decoder {
133    fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
134        self.decode(buf)
135    }
136
137    fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
138        self.flush()
139    }
140}
141
142pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
143    object_store: Arc<ObjectStore>,
144    path: String,
145    compression_type: CompressionType,
146    decoder_factory: F,
147) -> DataFusionResult<FileOpenFuture> {
148    let mut decoder = decoder_factory()?;
149    Ok(Box::pin(async move {
150        let reader = object_store
151            .reader(&path)
152            .await
153            .map_err(|e| DataFusionError::External(Box::new(e)))?
154            .into_bytes_stream(..)
155            .await
156            .map_err(|e| DataFusionError::External(Box::new(e)))?;
157
158        let mut upstream = compression_type.convert_stream(reader).fuse();
159
160        let mut buffered = Bytes::new();
161
162        let stream = futures::stream::poll_fn(move |cx| {
163            loop {
164                if buffered.is_empty()
165                    && let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
166                {
167                    buffered = result?;
168                }
169
170                let decoded = decoder.decode(buffered.as_ref())?;
171
172                if decoded == 0 {
173                    break;
174                } else {
175                    buffered.advance(decoded);
176                }
177            }
178
179            Poll::Ready(decoder.flush().transpose())
180        });
181
182        Ok(stream.boxed())
183    }))
184}
185
186pub async fn infer_schemas(
187    store: &ObjectStore,
188    files: &[String],
189    file_format: &dyn FileFormat,
190) -> Result<ArrowSchema> {
191    let mut schemas = Vec::with_capacity(files.len());
192    for file in files {
193        schemas.push(file_format.infer_schema(store, file).await?)
194    }
195    ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
196}
197
198pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
199    mut stream: SendableRecordBatchStream,
200    store: ObjectStore,
201    path: &str,
202    threshold: usize,
203    concurrency: usize,
204    encoder_factory: U,
205) -> Result<usize> {
206    let buffer = SharedBuffer::with_capacity(threshold);
207    let encoder = encoder_factory(buffer.clone());
208    let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
209        store
210            .writer_with(&path)
211            .concurrent(concurrency)
212            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
213            .await
214            .map(|v| v.into_futures_async_write().compat_write())
215            .context(error::WriteObjectSnafu { path })
216    });
217
218    let mut rows = 0;
219
220    while let Some(batch) = stream.next().await {
221        let batch = batch.context(error::ReadRecordBatchSnafu)?;
222        writer.write(&batch).await?;
223        rows += batch.num_rows();
224    }
225    writer.close_inner_writer().await?;
226    Ok(rows)
227}