common_datasource/file_format/
json.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
21use arrow::json::writer::LineDelimited;
22use arrow::record_batch::RecordBatch;
23use arrow_schema::Schema;
24use async_trait::async_trait;
25use common_runtime;
26use datafusion::physical_plan::SendableRecordBatchStream;
27use object_store::ObjectStore;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tokio_util::io::SyncIoBridge;
31
32use crate::buffered_writer::DfRecordBatchEncoder;
33use crate::compression::CompressionType;
34use crate::error::{self, Result};
35use crate::file_format::{self, stream_to_file, FileFormat};
36use crate::share_buffer::SharedBuffer;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct JsonFormat {
40    pub schema_infer_max_record: Option<usize>,
41    pub compression_type: CompressionType,
42}
43
44impl TryFrom<&HashMap<String, String>> for JsonFormat {
45    type Error = error::Error;
46
47    fn try_from(value: &HashMap<String, String>) -> Result<Self> {
48        let mut format = JsonFormat::default();
49        if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
50            format.compression_type = CompressionType::from_str(compression_type)?
51        };
52        if let Some(schema_infer_max_record) =
53            value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
54        {
55            format.schema_infer_max_record =
56                Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
57                    error::ParseFormatSnafu {
58                        key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
59                        value: schema_infer_max_record,
60                    }
61                    .build()
62                })?);
63        };
64        Ok(format)
65    }
66}
67
68impl Default for JsonFormat {
69    fn default() -> Self {
70        Self {
71            schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
72            compression_type: CompressionType::Uncompressed,
73        }
74    }
75}
76
77#[async_trait]
78impl FileFormat for JsonFormat {
79    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
80        let meta = store
81            .stat(path)
82            .await
83            .context(error::ReadObjectSnafu { path })?;
84
85        let reader = store
86            .reader(path)
87            .await
88            .context(error::ReadObjectSnafu { path })?
89            .into_futures_async_read(0..meta.content_length())
90            .await
91            .context(error::ReadObjectSnafu { path })?
92            .compat();
93
94        let decoded = self.compression_type.convert_async_read(reader);
95
96        let schema_infer_max_record = self.schema_infer_max_record;
97
98        common_runtime::spawn_blocking_global(move || {
99            let mut reader = BufReader::new(SyncIoBridge::new(decoded));
100
101            let iter = ValueIter::new(&mut reader, schema_infer_max_record);
102
103            let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
104
105            Ok(schema)
106        })
107        .await
108        .context(error::JoinHandleSnafu)?
109    }
110}
111
112pub async fn stream_to_json(
113    stream: SendableRecordBatchStream,
114    store: ObjectStore,
115    path: &str,
116    threshold: usize,
117    concurrency: usize,
118) -> Result<usize> {
119    stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
120        json::LineDelimitedWriter::new(buffer)
121    })
122    .await
123}
124
125impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
126    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
127        self.write(batch).context(error::WriteRecordBatchSnafu)
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use common_test_util::find_workspace_path;
134
135    use super::*;
136    use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD};
137    use crate::test_util::{format_schema, test_store};
138
139    fn test_data_root() -> String {
140        find_workspace_path("/src/common/datasource/tests/json")
141            .display()
142            .to_string()
143    }
144
145    #[tokio::test]
146    async fn infer_schema_basic() {
147        let json = JsonFormat::default();
148        let store = test_store(&test_data_root());
149        let schema = json.infer_schema(&store, "simple.json").await.unwrap();
150        let formatted: Vec<_> = format_schema(schema);
151
152        assert_eq!(
153            vec![
154                "a: Int64: NULL",
155                "b: Float64: NULL",
156                "c: Boolean: NULL",
157                "d: Utf8: NULL",
158            ],
159            formatted
160        );
161    }
162
163    #[tokio::test]
164    async fn infer_schema_with_limit() {
165        let json = JsonFormat {
166            schema_infer_max_record: Some(3),
167            ..JsonFormat::default()
168        };
169        let store = test_store(&test_data_root());
170        let schema = json
171            .infer_schema(&store, "schema_infer_limit.json")
172            .await
173            .unwrap();
174        let formatted: Vec<_> = format_schema(schema);
175
176        assert_eq!(
177            vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
178            formatted
179        );
180    }
181
182    #[test]
183    fn test_try_from() {
184        let map = HashMap::new();
185        let format = JsonFormat::try_from(&map).unwrap();
186
187        assert_eq!(format, JsonFormat::default());
188
189        let map = HashMap::from([
190            (
191                FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
192                "2000".to_string(),
193            ),
194            (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
195        ]);
196        let format = JsonFormat::try_from(&map).unwrap();
197
198        assert_eq!(
199            format,
200            JsonFormat {
201                compression_type: CompressionType::Zstd,
202                schema_infer_max_record: Some(2000),
203            }
204        );
205    }
206}