common_datasource/file_format/
json.rs1use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::WriterBuilder;
21use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
22use arrow::json::writer::LineDelimited;
23use arrow::record_batch::RecordBatch;
24use arrow_schema::Schema;
25use async_trait::async_trait;
26use common_runtime;
27use datafusion::physical_plan::SendableRecordBatchStream;
28use object_store::ObjectStore;
29use snafu::ResultExt;
30use tokio_util::compat::FuturesAsyncReadCompatExt;
31use tokio_util::io::SyncIoBridge;
32
33use crate::buffered_writer::DfRecordBatchEncoder;
34use crate::compression::CompressionType;
35use crate::error::{self, Result};
36use crate::file_format::{self, FileFormat, stream_to_file};
37use crate::share_buffer::SharedBuffer;
38use crate::util::normalize_infer_schema;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct JsonFormat {
42 pub schema_infer_max_record: Option<usize>,
43 pub compression_type: CompressionType,
44 pub timestamp_format: Option<String>,
45 pub time_format: Option<String>,
46 pub date_format: Option<String>,
47}
48
49impl TryFrom<&HashMap<String, String>> for JsonFormat {
50 type Error = error::Error;
51
52 fn try_from(value: &HashMap<String, String>) -> Result<Self> {
53 let mut format = JsonFormat::default();
54 if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
55 format.compression_type = CompressionType::from_str(compression_type)?
56 };
57 if let Some(schema_infer_max_record) =
58 value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
59 {
60 format.schema_infer_max_record =
61 Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
62 error::ParseFormatSnafu {
63 key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
64 value: schema_infer_max_record,
65 }
66 .build()
67 })?);
68 };
69 format.timestamp_format = value.get(file_format::TIMESTAMP_FORMAT).cloned();
70 format.time_format = value.get(file_format::TIME_FORMAT).cloned();
71 format.date_format = value.get(file_format::DATE_FORMAT).cloned();
72 Ok(format)
73 }
74}
75
76impl Default for JsonFormat {
77 fn default() -> Self {
78 Self {
79 schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
80 compression_type: CompressionType::Uncompressed,
81 timestamp_format: None,
82 time_format: None,
83 date_format: None,
84 }
85 }
86}
87
88#[async_trait]
89impl FileFormat for JsonFormat {
90 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
91 let meta = store
92 .stat(path)
93 .await
94 .context(error::ReadObjectSnafu { path })?;
95
96 let reader = store
97 .reader(path)
98 .await
99 .context(error::ReadObjectSnafu { path })?
100 .into_futures_async_read(0..meta.content_length())
101 .await
102 .context(error::ReadObjectSnafu { path })?
103 .compat();
104
105 let decoded = self.compression_type.convert_async_read(reader);
106
107 let schema_infer_max_record = self.schema_infer_max_record;
108
109 common_runtime::spawn_blocking_global(move || {
110 let mut reader = BufReader::new(SyncIoBridge::new(decoded));
111
112 let iter = ValueIter::new(&mut reader, schema_infer_max_record);
113
114 let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
115
116 Ok(normalize_infer_schema(schema))
117 })
118 .await
119 .context(error::JoinHandleSnafu)?
120 }
121}
122
123pub async fn stream_to_json(
124 stream: SendableRecordBatchStream,
125 store: ObjectStore,
126 path: &str,
127 threshold: usize,
128 concurrency: usize,
129 format: &JsonFormat,
130) -> Result<usize> {
131 stream_to_file(
132 stream,
133 store,
134 path,
135 threshold,
136 concurrency,
137 format.compression_type,
138 |buffer| {
139 let mut builder = WriterBuilder::new().with_explicit_nulls(true);
140 if let Some(timestamp_format) = &format.timestamp_format {
141 builder = builder.with_timestamp_format(timestamp_format.to_owned());
142 }
143 if let Some(time_format) = &format.time_format {
144 builder = builder.with_time_format(time_format.to_owned());
145 }
146 if let Some(date_format) = &format.date_format {
147 builder = builder.with_date_format(date_format.to_owned());
148 }
149 builder.build::<_, LineDelimited>(buffer)
150 },
151 )
152 .await
153}
154
155impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
156 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
157 self.write(batch).context(error::WriteRecordBatchSnafu)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use std::sync::Arc;
164
165 use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
166 use common_recordbatch::{RecordBatch, RecordBatches};
167 use common_test_util::find_workspace_path;
168 use datafusion::datasource::physical_plan::{FileSource, JsonSource};
169 use datatypes::prelude::ConcreteDataType;
170 use datatypes::schema::{ColumnSchema, Schema};
171 use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
172 use futures::TryStreamExt;
173
174 use super::*;
175 use crate::file_format::{
176 FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
177 };
178 use crate::test_util::{format_schema, test_store};
179
180 fn test_data_root() -> String {
181 find_workspace_path("/src/common/datasource/tests/json")
182 .display()
183 .to_string()
184 }
185
186 #[tokio::test]
187 async fn infer_schema_basic() {
188 let json = JsonFormat::default();
189 let store = test_store(&test_data_root());
190 let schema = json.infer_schema(&store, "simple.json").await.unwrap();
191 let formatted: Vec<_> = format_schema(schema);
192
193 assert_eq!(
194 vec![
195 "a: Int64: NULL",
196 "b: Float64: NULL",
197 "c: Boolean: NULL",
198 "d: Utf8: NULL",
199 ],
200 formatted
201 );
202 }
203
204 #[tokio::test]
205 async fn normalize_infer_schema() {
206 let json = JsonFormat {
207 schema_infer_max_record: Some(3),
208 ..JsonFormat::default()
209 };
210 let store = test_store(&test_data_root());
211 let schema = json.infer_schema(&store, "max_infer.json").await.unwrap();
212 let formatted: Vec<_> = format_schema(schema);
213
214 assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted,);
215 }
216
217 #[tokio::test]
218 async fn infer_schema_with_limit() {
219 let json = JsonFormat {
220 schema_infer_max_record: Some(3),
221 ..JsonFormat::default()
222 };
223 let store = test_store(&test_data_root());
224 let schema = json
225 .infer_schema(&store, "schema_infer_limit.json")
226 .await
227 .unwrap();
228 let formatted: Vec<_> = format_schema(schema);
229
230 assert_eq!(
231 vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
232 formatted
233 );
234 }
235
236 #[test]
237 fn test_try_from() {
238 let map = HashMap::new();
239 let format = JsonFormat::try_from(&map).unwrap();
240
241 assert_eq!(format, JsonFormat::default());
242
243 let map = HashMap::from([
244 (
245 FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
246 "2000".to_string(),
247 ),
248 (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
249 ]);
250 let format = JsonFormat::try_from(&map).unwrap();
251
252 assert_eq!(
253 format,
254 JsonFormat {
255 compression_type: CompressionType::Zstd,
256 schema_infer_max_record: Some(2000),
257 ..JsonFormat::default()
258 }
259 );
260 }
261
262 #[tokio::test]
263 async fn test_compressed_json() {
264 let column_schemas = vec![
266 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
267 ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
268 ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
269 ];
270 let schema = Arc::new(Schema::new(column_schemas));
271
272 let batch1_columns: Vec<VectorRef> = vec![
274 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
275 Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
276 Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
277 ];
278 let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
279
280 let batch2_columns: Vec<VectorRef> = vec![
281 Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
282 Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
283 Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
284 ];
285 let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
286
287 let batch3_columns: Vec<VectorRef> = vec![
288 Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
289 Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
290 Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
291 ];
292 let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
293
294 let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
296
297 let compression_types = vec![
299 CompressionType::Gzip,
300 CompressionType::Bzip2,
301 CompressionType::Xz,
302 CompressionType::Zstd,
303 ];
304
305 let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json");
307 for compression_type in compression_types {
308 let format = JsonFormat {
309 compression_type,
310 ..JsonFormat::default()
311 };
312
313 let compressed_file_name =
314 format!("test_compressed_json.{}", compression_type.file_extension());
315 let compressed_file_path = temp_dir.path().join(&compressed_file_name);
316 let compressed_file_path_str = compressed_file_path.to_str().unwrap();
317
318 let store = test_store("/");
320
321 let rows = stream_to_json(
323 Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
324 store,
325 compressed_file_path_str,
326 1024,
327 1,
328 &format,
329 )
330 .await
331 .unwrap();
332
333 assert_eq!(rows, 9);
334
335 assert!(compressed_file_path.exists());
337 let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
338 assert!(file_size > 0);
339
340 let file_content = std::fs::read(&compressed_file_path).unwrap();
342 match compression_type {
345 CompressionType::Gzip => {
346 assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
348 assert_eq!(
349 file_content[1], 0x8b,
350 "Gzip file should have 0x8b as second byte"
351 );
352 }
353 CompressionType::Bzip2 => {
354 assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
356 assert_eq!(
357 file_content[1], b'Z',
358 "Bzip2 file should have 'Z' as second byte"
359 );
360 }
361 CompressionType::Xz => {
362 assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
364 }
365 CompressionType::Zstd => {
366 assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
368 assert_eq!(
369 file_content[1], 0xB5,
370 "Zstd file should have 0xB5 as second byte"
371 );
372 }
373 _ => {}
374 }
375
376 let store = test_store("/");
378 let schema = Arc::new(
379 JsonFormat {
380 compression_type,
381 ..Default::default()
382 }
383 .infer_schema(&store, compressed_file_path_str)
384 .await
385 .unwrap(),
386 );
387 let json_source = JsonSource::new(schema).with_batch_size(8192);
388
389 let stream = file_to_stream(
390 &store,
391 compressed_file_path_str,
392 json_source.clone(),
393 None,
394 compression_type,
395 )
396 .await
397 .unwrap();
398
399 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
400 let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
401 .unwrap()
402 .to_string();
403 let expected = r#"+----+---------+-------+
404| id | name | value |
405+----+---------+-------+
406| 1 | Alice | 10.5 |
407| 2 | Bob | 20.3 |
408| 3 | Charlie | 30.7 |
409| 4 | David | 40.1 |
410| 5 | Eva | 50.2 |
411| 6 | Frank | 60.3 |
412| 7 | Grace | 70.4 |
413| 8 | Henry | 80.5 |
414| 9 | Ivy | 90.6 |
415+----+---------+-------+"#;
416 assert_eq!(expected, pretty_print);
417 }
418 }
419}