common_datasource/file_format/
json.rs1use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
21use arrow::json::writer::LineDelimited;
22use arrow::record_batch::RecordBatch;
23use arrow_schema::Schema;
24use async_trait::async_trait;
25use common_runtime;
26use datafusion::physical_plan::SendableRecordBatchStream;
27use object_store::ObjectStore;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tokio_util::io::SyncIoBridge;
31
32use crate::buffered_writer::DfRecordBatchEncoder;
33use crate::compression::CompressionType;
34use crate::error::{self, Result};
35use crate::file_format::{self, stream_to_file, FileFormat};
36use crate::share_buffer::SharedBuffer;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct JsonFormat {
40 pub schema_infer_max_record: Option<usize>,
41 pub compression_type: CompressionType,
42}
43
44impl TryFrom<&HashMap<String, String>> for JsonFormat {
45 type Error = error::Error;
46
47 fn try_from(value: &HashMap<String, String>) -> Result<Self> {
48 let mut format = JsonFormat::default();
49 if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
50 format.compression_type = CompressionType::from_str(compression_type)?
51 };
52 if let Some(schema_infer_max_record) =
53 value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
54 {
55 format.schema_infer_max_record =
56 Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
57 error::ParseFormatSnafu {
58 key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
59 value: schema_infer_max_record,
60 }
61 .build()
62 })?);
63 };
64 Ok(format)
65 }
66}
67
68impl Default for JsonFormat {
69 fn default() -> Self {
70 Self {
71 schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
72 compression_type: CompressionType::Uncompressed,
73 }
74 }
75}
76
77#[async_trait]
78impl FileFormat for JsonFormat {
79 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
80 let meta = store
81 .stat(path)
82 .await
83 .context(error::ReadObjectSnafu { path })?;
84
85 let reader = store
86 .reader(path)
87 .await
88 .context(error::ReadObjectSnafu { path })?
89 .into_futures_async_read(0..meta.content_length())
90 .await
91 .context(error::ReadObjectSnafu { path })?
92 .compat();
93
94 let decoded = self.compression_type.convert_async_read(reader);
95
96 let schema_infer_max_record = self.schema_infer_max_record;
97
98 common_runtime::spawn_blocking_global(move || {
99 let mut reader = BufReader::new(SyncIoBridge::new(decoded));
100
101 let iter = ValueIter::new(&mut reader, schema_infer_max_record);
102
103 let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
104
105 Ok(schema)
106 })
107 .await
108 .context(error::JoinHandleSnafu)?
109 }
110}
111
112pub async fn stream_to_json(
113 stream: SendableRecordBatchStream,
114 store: ObjectStore,
115 path: &str,
116 threshold: usize,
117 concurrency: usize,
118) -> Result<usize> {
119 stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
120 json::LineDelimitedWriter::new(buffer)
121 })
122 .await
123}
124
125impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
126 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
127 self.write(batch).context(error::WriteRecordBatchSnafu)
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use common_test_util::find_workspace_path;
134
135 use super::*;
136 use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD};
137 use crate::test_util::{format_schema, test_store};
138
139 fn test_data_root() -> String {
140 find_workspace_path("/src/common/datasource/tests/json")
141 .display()
142 .to_string()
143 }
144
145 #[tokio::test]
146 async fn infer_schema_basic() {
147 let json = JsonFormat::default();
148 let store = test_store(&test_data_root());
149 let schema = json.infer_schema(&store, "simple.json").await.unwrap();
150 let formatted: Vec<_> = format_schema(schema);
151
152 assert_eq!(
153 vec![
154 "a: Int64: NULL",
155 "b: Float64: NULL",
156 "c: Boolean: NULL",
157 "d: Utf8: NULL",
158 ],
159 formatted
160 );
161 }
162
163 #[tokio::test]
164 async fn infer_schema_with_limit() {
165 let json = JsonFormat {
166 schema_infer_max_record: Some(3),
167 ..JsonFormat::default()
168 };
169 let store = test_store(&test_data_root());
170 let schema = json
171 .infer_schema(&store, "schema_infer_limit.json")
172 .await
173 .unwrap();
174 let formatted: Vec<_> = format_schema(schema);
175
176 assert_eq!(
177 vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
178 formatted
179 );
180 }
181
182 #[test]
183 fn test_try_from() {
184 let map = HashMap::new();
185 let format = JsonFormat::try_from(&map).unwrap();
186
187 assert_eq!(format, JsonFormat::default());
188
189 let map = HashMap::from([
190 (
191 FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
192 "2000".to_string(),
193 ),
194 (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
195 ]);
196 let format = JsonFormat::try_from(&map).unwrap();
197
198 assert_eq!(
199 format,
200 JsonFormat {
201 compression_type: CompressionType::Zstd,
202 schema_infer_max_record: Some(2000),
203 }
204 );
205 }
206}