common_datasource/file_format/
json.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::WriterBuilder;
21use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
22use arrow::json::writer::LineDelimited;
23use arrow::record_batch::RecordBatch;
24use arrow_schema::Schema;
25use async_trait::async_trait;
26use common_runtime;
27use datafusion::physical_plan::SendableRecordBatchStream;
28use object_store::ObjectStore;
29use snafu::ResultExt;
30use tokio_util::compat::FuturesAsyncReadCompatExt;
31use tokio_util::io::SyncIoBridge;
32
33use crate::buffered_writer::DfRecordBatchEncoder;
34use crate::compression::CompressionType;
35use crate::error::{self, Result};
36use crate::file_format::{self, FileFormat, stream_to_file};
37use crate::share_buffer::SharedBuffer;
38use crate::util::normalize_infer_schema;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct JsonFormat {
42    pub schema_infer_max_record: Option<usize>,
43    pub compression_type: CompressionType,
44    pub timestamp_format: Option<String>,
45    pub time_format: Option<String>,
46    pub date_format: Option<String>,
47}
48
49impl TryFrom<&HashMap<String, String>> for JsonFormat {
50    type Error = error::Error;
51
52    fn try_from(value: &HashMap<String, String>) -> Result<Self> {
53        let mut format = JsonFormat::default();
54        if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
55            format.compression_type = CompressionType::from_str(compression_type)?
56        };
57        if let Some(schema_infer_max_record) =
58            value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
59        {
60            format.schema_infer_max_record =
61                Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
62                    error::ParseFormatSnafu {
63                        key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
64                        value: schema_infer_max_record,
65                    }
66                    .build()
67                })?);
68        };
69        format.timestamp_format = value.get(file_format::TIMESTAMP_FORMAT).cloned();
70        format.time_format = value.get(file_format::TIME_FORMAT).cloned();
71        format.date_format = value.get(file_format::DATE_FORMAT).cloned();
72        Ok(format)
73    }
74}
75
76impl Default for JsonFormat {
77    fn default() -> Self {
78        Self {
79            schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
80            compression_type: CompressionType::Uncompressed,
81            timestamp_format: None,
82            time_format: None,
83            date_format: None,
84        }
85    }
86}
87
88#[async_trait]
89impl FileFormat for JsonFormat {
90    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
91        let meta = store
92            .stat(path)
93            .await
94            .context(error::ReadObjectSnafu { path })?;
95
96        let reader = store
97            .reader(path)
98            .await
99            .context(error::ReadObjectSnafu { path })?
100            .into_futures_async_read(0..meta.content_length())
101            .await
102            .context(error::ReadObjectSnafu { path })?
103            .compat();
104
105        let decoded = self.compression_type.convert_async_read(reader);
106
107        let schema_infer_max_record = self.schema_infer_max_record;
108
109        common_runtime::spawn_blocking_global(move || {
110            let mut reader = BufReader::new(SyncIoBridge::new(decoded));
111
112            let iter = ValueIter::new(&mut reader, schema_infer_max_record);
113
114            let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
115
116            Ok(normalize_infer_schema(schema))
117        })
118        .await
119        .context(error::JoinHandleSnafu)?
120    }
121}
122
123pub async fn stream_to_json(
124    stream: SendableRecordBatchStream,
125    store: ObjectStore,
126    path: &str,
127    threshold: usize,
128    concurrency: usize,
129    format: &JsonFormat,
130) -> Result<usize> {
131    stream_to_file(
132        stream,
133        store,
134        path,
135        threshold,
136        concurrency,
137        format.compression_type,
138        |buffer| {
139            let mut builder = WriterBuilder::new().with_explicit_nulls(true);
140            if let Some(timestamp_format) = &format.timestamp_format {
141                builder = builder.with_timestamp_format(timestamp_format.to_owned());
142            }
143            if let Some(time_format) = &format.time_format {
144                builder = builder.with_time_format(time_format.to_owned());
145            }
146            if let Some(date_format) = &format.date_format {
147                builder = builder.with_date_format(date_format.to_owned());
148            }
149            builder.build::<_, LineDelimited>(buffer)
150        },
151    )
152    .await
153}
154
155impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
156    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
157        self.write(batch).context(error::WriteRecordBatchSnafu)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use std::sync::Arc;
164
165    use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
166    use common_recordbatch::{RecordBatch, RecordBatches};
167    use common_test_util::find_workspace_path;
168    use datafusion::datasource::physical_plan::{FileSource, JsonSource};
169    use datatypes::prelude::ConcreteDataType;
170    use datatypes::schema::{ColumnSchema, Schema};
171    use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
172    use futures::TryStreamExt;
173
174    use super::*;
175    use crate::file_format::{
176        FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
177    };
178    use crate::test_util::{format_schema, test_store};
179
180    fn test_data_root() -> String {
181        find_workspace_path("/src/common/datasource/tests/json")
182            .display()
183            .to_string()
184    }
185
186    #[tokio::test]
187    async fn infer_schema_basic() {
188        let json = JsonFormat::default();
189        let store = test_store(&test_data_root());
190        let schema = json.infer_schema(&store, "simple.json").await.unwrap();
191        let formatted: Vec<_> = format_schema(schema);
192
193        assert_eq!(
194            vec![
195                "a: Int64: NULL",
196                "b: Float64: NULL",
197                "c: Boolean: NULL",
198                "d: Utf8: NULL",
199            ],
200            formatted
201        );
202    }
203
204    #[tokio::test]
205    async fn normalize_infer_schema() {
206        let json = JsonFormat {
207            schema_infer_max_record: Some(3),
208            ..JsonFormat::default()
209        };
210        let store = test_store(&test_data_root());
211        let schema = json.infer_schema(&store, "max_infer.json").await.unwrap();
212        let formatted: Vec<_> = format_schema(schema);
213
214        assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted,);
215    }
216
217    #[tokio::test]
218    async fn infer_schema_with_limit() {
219        let json = JsonFormat {
220            schema_infer_max_record: Some(3),
221            ..JsonFormat::default()
222        };
223        let store = test_store(&test_data_root());
224        let schema = json
225            .infer_schema(&store, "schema_infer_limit.json")
226            .await
227            .unwrap();
228        let formatted: Vec<_> = format_schema(schema);
229
230        assert_eq!(
231            vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
232            formatted
233        );
234    }
235
236    #[test]
237    fn test_try_from() {
238        let map = HashMap::new();
239        let format = JsonFormat::try_from(&map).unwrap();
240
241        assert_eq!(format, JsonFormat::default());
242
243        let map = HashMap::from([
244            (
245                FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
246                "2000".to_string(),
247            ),
248            (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
249        ]);
250        let format = JsonFormat::try_from(&map).unwrap();
251
252        assert_eq!(
253            format,
254            JsonFormat {
255                compression_type: CompressionType::Zstd,
256                schema_infer_max_record: Some(2000),
257                ..JsonFormat::default()
258            }
259        );
260    }
261
262    #[tokio::test]
263    async fn test_compressed_json() {
264        // Create test data
265        let column_schemas = vec![
266            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
267            ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
268            ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
269        ];
270        let schema = Arc::new(Schema::new(column_schemas));
271
272        // Create multiple record batches with different data
273        let batch1_columns: Vec<VectorRef> = vec![
274            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
275            Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
276            Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
277        ];
278        let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
279
280        let batch2_columns: Vec<VectorRef> = vec![
281            Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
282            Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
283            Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
284        ];
285        let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
286
287        let batch3_columns: Vec<VectorRef> = vec![
288            Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
289            Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
290            Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
291        ];
292        let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
293
294        // Combine all batches into a RecordBatches collection
295        let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
296
297        // Test with different compression types
298        let compression_types = vec![
299            CompressionType::Gzip,
300            CompressionType::Bzip2,
301            CompressionType::Xz,
302            CompressionType::Zstd,
303        ];
304
305        // Create a temporary file path
306        let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json");
307        for compression_type in compression_types {
308            let format = JsonFormat {
309                compression_type,
310                ..JsonFormat::default()
311            };
312
313            let compressed_file_name =
314                format!("test_compressed_json.{}", compression_type.file_extension());
315            let compressed_file_path = temp_dir.path().join(&compressed_file_name);
316            let compressed_file_path_str = compressed_file_path.to_str().unwrap();
317
318            // Create a simple file store for testing
319            let store = test_store("/");
320
321            // Export JSON with compression
322            let rows = stream_to_json(
323                Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
324                store,
325                compressed_file_path_str,
326                1024,
327                1,
328                &format,
329            )
330            .await
331            .unwrap();
332
333            assert_eq!(rows, 9);
334
335            // Verify compressed file was created and has content
336            assert!(compressed_file_path.exists());
337            let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
338            assert!(file_size > 0);
339
340            // Verify the file is actually compressed
341            let file_content = std::fs::read(&compressed_file_path).unwrap();
342            // Compressed files should not start with '{' (JSON character)
343            // They should have compression magic bytes
344            match compression_type {
345                CompressionType::Gzip => {
346                    // Gzip magic bytes: 0x1f 0x8b
347                    assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
348                    assert_eq!(
349                        file_content[1], 0x8b,
350                        "Gzip file should have 0x8b as second byte"
351                    );
352                }
353                CompressionType::Bzip2 => {
354                    // Bzip2 magic bytes: 'BZ'
355                    assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
356                    assert_eq!(
357                        file_content[1], b'Z',
358                        "Bzip2 file should have 'Z' as second byte"
359                    );
360                }
361                CompressionType::Xz => {
362                    // XZ magic bytes: 0xFD '7zXZ'
363                    assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
364                }
365                CompressionType::Zstd => {
366                    // Zstd magic bytes: 0x28 0xB5 0x2F 0xFD
367                    assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
368                    assert_eq!(
369                        file_content[1], 0xB5,
370                        "Zstd file should have 0xB5 as second byte"
371                    );
372                }
373                _ => {}
374            }
375
376            // Verify the compressed file can be decompressed and content matches original data
377            let store = test_store("/");
378            let schema = Arc::new(
379                JsonFormat {
380                    compression_type,
381                    ..Default::default()
382                }
383                .infer_schema(&store, compressed_file_path_str)
384                .await
385                .unwrap(),
386            );
387            let json_source = JsonSource::new(schema).with_batch_size(8192);
388
389            let stream = file_to_stream(
390                &store,
391                compressed_file_path_str,
392                json_source.clone(),
393                None,
394                compression_type,
395            )
396            .await
397            .unwrap();
398
399            let batches = stream.try_collect::<Vec<_>>().await.unwrap();
400            let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
401                .unwrap()
402                .to_string();
403            let expected = r#"+----+---------+-------+
404| id | name    | value |
405+----+---------+-------+
406| 1  | Alice   | 10.5  |
407| 2  | Bob     | 20.3  |
408| 3  | Charlie | 30.7  |
409| 4  | David   | 40.1  |
410| 5  | Eva     | 50.2  |
411| 6  | Frank   | 60.3  |
412| 7  | Grace   | 70.4  |
413| 8  | Henry   | 80.5  |
414| 9  | Ivy     | 90.6  |
415+----+---------+-------+"#;
416            assert_eq!(expected, pretty_print);
417        }
418    }
419}