common_datasource/file_format/
json.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
21use arrow::json::writer::LineDelimited;
22use arrow::record_batch::RecordBatch;
23use arrow_schema::Schema;
24use async_trait::async_trait;
25use common_runtime;
26use datafusion::physical_plan::SendableRecordBatchStream;
27use object_store::ObjectStore;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tokio_util::io::SyncIoBridge;
31
32use crate::buffered_writer::DfRecordBatchEncoder;
33use crate::compression::CompressionType;
34use crate::error::{self, Result};
35use crate::file_format::{self, FileFormat, stream_to_file};
36use crate::share_buffer::SharedBuffer;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct JsonFormat {
40    pub schema_infer_max_record: Option<usize>,
41    pub compression_type: CompressionType,
42}
43
44impl TryFrom<&HashMap<String, String>> for JsonFormat {
45    type Error = error::Error;
46
47    fn try_from(value: &HashMap<String, String>) -> Result<Self> {
48        let mut format = JsonFormat::default();
49        if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
50            format.compression_type = CompressionType::from_str(compression_type)?
51        };
52        if let Some(schema_infer_max_record) =
53            value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
54        {
55            format.schema_infer_max_record =
56                Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
57                    error::ParseFormatSnafu {
58                        key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
59                        value: schema_infer_max_record,
60                    }
61                    .build()
62                })?);
63        };
64        Ok(format)
65    }
66}
67
68impl Default for JsonFormat {
69    fn default() -> Self {
70        Self {
71            schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
72            compression_type: CompressionType::Uncompressed,
73        }
74    }
75}
76
77#[async_trait]
78impl FileFormat for JsonFormat {
79    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
80        let meta = store
81            .stat(path)
82            .await
83            .context(error::ReadObjectSnafu { path })?;
84
85        let reader = store
86            .reader(path)
87            .await
88            .context(error::ReadObjectSnafu { path })?
89            .into_futures_async_read(0..meta.content_length())
90            .await
91            .context(error::ReadObjectSnafu { path })?
92            .compat();
93
94        let decoded = self.compression_type.convert_async_read(reader);
95
96        let schema_infer_max_record = self.schema_infer_max_record;
97
98        common_runtime::spawn_blocking_global(move || {
99            let mut reader = BufReader::new(SyncIoBridge::new(decoded));
100
101            let iter = ValueIter::new(&mut reader, schema_infer_max_record);
102
103            let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
104
105            Ok(schema)
106        })
107        .await
108        .context(error::JoinHandleSnafu)?
109    }
110}
111
112pub async fn stream_to_json(
113    stream: SendableRecordBatchStream,
114    store: ObjectStore,
115    path: &str,
116    threshold: usize,
117    concurrency: usize,
118    format: &JsonFormat,
119) -> Result<usize> {
120    stream_to_file(
121        stream,
122        store,
123        path,
124        threshold,
125        concurrency,
126        format.compression_type,
127        json::LineDelimitedWriter::new,
128    )
129    .await
130}
131
132impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
133    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
134        self.write(batch).context(error::WriteRecordBatchSnafu)
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::sync::Arc;
141
142    use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
143    use common_recordbatch::{RecordBatch, RecordBatches};
144    use common_test_util::find_workspace_path;
145    use datafusion::datasource::physical_plan::{FileSource, JsonSource};
146    use datatypes::prelude::ConcreteDataType;
147    use datatypes::schema::{ColumnSchema, Schema};
148    use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
149    use futures::TryStreamExt;
150
151    use super::*;
152    use crate::file_format::{
153        FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
154    };
155    use crate::test_util::{format_schema, test_store};
156
157    fn test_data_root() -> String {
158        find_workspace_path("/src/common/datasource/tests/json")
159            .display()
160            .to_string()
161    }
162
163    #[tokio::test]
164    async fn infer_schema_basic() {
165        let json = JsonFormat::default();
166        let store = test_store(&test_data_root());
167        let schema = json.infer_schema(&store, "simple.json").await.unwrap();
168        let formatted: Vec<_> = format_schema(schema);
169
170        assert_eq!(
171            vec![
172                "a: Int64: NULL",
173                "b: Float64: NULL",
174                "c: Boolean: NULL",
175                "d: Utf8: NULL",
176            ],
177            formatted
178        );
179    }
180
181    #[tokio::test]
182    async fn infer_schema_with_limit() {
183        let json = JsonFormat {
184            schema_infer_max_record: Some(3),
185            ..JsonFormat::default()
186        };
187        let store = test_store(&test_data_root());
188        let schema = json
189            .infer_schema(&store, "schema_infer_limit.json")
190            .await
191            .unwrap();
192        let formatted: Vec<_> = format_schema(schema);
193
194        assert_eq!(
195            vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
196            formatted
197        );
198    }
199
200    #[test]
201    fn test_try_from() {
202        let map = HashMap::new();
203        let format = JsonFormat::try_from(&map).unwrap();
204
205        assert_eq!(format, JsonFormat::default());
206
207        let map = HashMap::from([
208            (
209                FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
210                "2000".to_string(),
211            ),
212            (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
213        ]);
214        let format = JsonFormat::try_from(&map).unwrap();
215
216        assert_eq!(
217            format,
218            JsonFormat {
219                compression_type: CompressionType::Zstd,
220                schema_infer_max_record: Some(2000),
221            }
222        );
223    }
224
225    #[tokio::test]
226    async fn test_compressed_json() {
227        // Create test data
228        let column_schemas = vec![
229            ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
230            ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
231            ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
232        ];
233        let schema = Arc::new(Schema::new(column_schemas));
234
235        // Create multiple record batches with different data
236        let batch1_columns: Vec<VectorRef> = vec![
237            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
238            Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
239            Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
240        ];
241        let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
242
243        let batch2_columns: Vec<VectorRef> = vec![
244            Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
245            Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
246            Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
247        ];
248        let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
249
250        let batch3_columns: Vec<VectorRef> = vec![
251            Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
252            Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
253            Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
254        ];
255        let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
256
257        // Combine all batches into a RecordBatches collection
258        let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
259
260        // Test with different compression types
261        let compression_types = vec![
262            CompressionType::Gzip,
263            CompressionType::Bzip2,
264            CompressionType::Xz,
265            CompressionType::Zstd,
266        ];
267
268        // Create a temporary file path
269        let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json");
270        for compression_type in compression_types {
271            let format = JsonFormat {
272                compression_type,
273                ..JsonFormat::default()
274            };
275
276            let compressed_file_name =
277                format!("test_compressed_json.{}", compression_type.file_extension());
278            let compressed_file_path = temp_dir.path().join(&compressed_file_name);
279            let compressed_file_path_str = compressed_file_path.to_str().unwrap();
280
281            // Create a simple file store for testing
282            let store = test_store("/");
283
284            // Export JSON with compression
285            let rows = stream_to_json(
286                Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
287                store,
288                compressed_file_path_str,
289                1024,
290                1,
291                &format,
292            )
293            .await
294            .unwrap();
295
296            assert_eq!(rows, 9);
297
298            // Verify compressed file was created and has content
299            assert!(compressed_file_path.exists());
300            let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
301            assert!(file_size > 0);
302
303            // Verify the file is actually compressed
304            let file_content = std::fs::read(&compressed_file_path).unwrap();
305            // Compressed files should not start with '{' (JSON character)
306            // They should have compression magic bytes
307            match compression_type {
308                CompressionType::Gzip => {
309                    // Gzip magic bytes: 0x1f 0x8b
310                    assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
311                    assert_eq!(
312                        file_content[1], 0x8b,
313                        "Gzip file should have 0x8b as second byte"
314                    );
315                }
316                CompressionType::Bzip2 => {
317                    // Bzip2 magic bytes: 'BZ'
318                    assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
319                    assert_eq!(
320                        file_content[1], b'Z',
321                        "Bzip2 file should have 'Z' as second byte"
322                    );
323                }
324                CompressionType::Xz => {
325                    // XZ magic bytes: 0xFD '7zXZ'
326                    assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
327                }
328                CompressionType::Zstd => {
329                    // Zstd magic bytes: 0x28 0xB5 0x2F 0xFD
330                    assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
331                    assert_eq!(
332                        file_content[1], 0xB5,
333                        "Zstd file should have 0xB5 as second byte"
334                    );
335                }
336                _ => {}
337            }
338
339            // Verify the compressed file can be decompressed and content matches original data
340            let store = test_store("/");
341            let schema = Arc::new(
342                JsonFormat {
343                    compression_type,
344                    ..Default::default()
345                }
346                .infer_schema(&store, compressed_file_path_str)
347                .await
348                .unwrap(),
349            );
350            let json_source = JsonSource::new()
351                .with_schema(schema.clone())
352                .with_batch_size(8192);
353
354            let stream = file_to_stream(
355                &store,
356                compressed_file_path_str,
357                schema.clone(),
358                json_source.clone(),
359                None,
360                compression_type,
361            )
362            .await
363            .unwrap();
364
365            let batches = stream.try_collect::<Vec<_>>().await.unwrap();
366            let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
367                .unwrap()
368                .to_string();
369            let expected = r#"+----+---------+-------+
370| id | name    | value |
371+----+---------+-------+
372| 1  | Alice   | 10.5  |
373| 2  | Bob     | 20.3  |
374| 3  | Charlie | 30.7  |
375| 4  | David   | 40.1  |
376| 5  | Eva     | 50.2  |
377| 6  | Frank   | 60.3  |
378| 7  | Grace   | 70.4  |
379| 8  | Henry   | 80.5  |
380| 9  | Ivy     | 90.6  |
381+----+---------+-------+"#;
382            assert_eq!(expected, pretty_print);
383        }
384    }
385}