common_datasource/file_format/
json.rs1use std::collections::HashMap;
16use std::io::BufReader;
17use std::str::FromStr;
18
19use arrow::json;
20use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
21use arrow::json::writer::LineDelimited;
22use arrow::record_batch::RecordBatch;
23use arrow_schema::Schema;
24use async_trait::async_trait;
25use common_runtime;
26use datafusion::physical_plan::SendableRecordBatchStream;
27use object_store::ObjectStore;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tokio_util::io::SyncIoBridge;
31
32use crate::buffered_writer::DfRecordBatchEncoder;
33use crate::compression::CompressionType;
34use crate::error::{self, Result};
35use crate::file_format::{self, FileFormat, stream_to_file};
36use crate::share_buffer::SharedBuffer;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct JsonFormat {
40 pub schema_infer_max_record: Option<usize>,
41 pub compression_type: CompressionType,
42}
43
44impl TryFrom<&HashMap<String, String>> for JsonFormat {
45 type Error = error::Error;
46
47 fn try_from(value: &HashMap<String, String>) -> Result<Self> {
48 let mut format = JsonFormat::default();
49 if let Some(compression_type) = value.get(file_format::FORMAT_COMPRESSION_TYPE) {
50 format.compression_type = CompressionType::from_str(compression_type)?
51 };
52 if let Some(schema_infer_max_record) =
53 value.get(file_format::FORMAT_SCHEMA_INFER_MAX_RECORD)
54 {
55 format.schema_infer_max_record =
56 Some(schema_infer_max_record.parse::<usize>().map_err(|_| {
57 error::ParseFormatSnafu {
58 key: file_format::FORMAT_SCHEMA_INFER_MAX_RECORD,
59 value: schema_infer_max_record,
60 }
61 .build()
62 })?);
63 };
64 Ok(format)
65 }
66}
67
68impl Default for JsonFormat {
69 fn default() -> Self {
70 Self {
71 schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
72 compression_type: CompressionType::Uncompressed,
73 }
74 }
75}
76
77#[async_trait]
78impl FileFormat for JsonFormat {
79 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
80 let meta = store
81 .stat(path)
82 .await
83 .context(error::ReadObjectSnafu { path })?;
84
85 let reader = store
86 .reader(path)
87 .await
88 .context(error::ReadObjectSnafu { path })?
89 .into_futures_async_read(0..meta.content_length())
90 .await
91 .context(error::ReadObjectSnafu { path })?
92 .compat();
93
94 let decoded = self.compression_type.convert_async_read(reader);
95
96 let schema_infer_max_record = self.schema_infer_max_record;
97
98 common_runtime::spawn_blocking_global(move || {
99 let mut reader = BufReader::new(SyncIoBridge::new(decoded));
100
101 let iter = ValueIter::new(&mut reader, schema_infer_max_record);
102
103 let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
104
105 Ok(schema)
106 })
107 .await
108 .context(error::JoinHandleSnafu)?
109 }
110}
111
112pub async fn stream_to_json(
113 stream: SendableRecordBatchStream,
114 store: ObjectStore,
115 path: &str,
116 threshold: usize,
117 concurrency: usize,
118 format: &JsonFormat,
119) -> Result<usize> {
120 stream_to_file(
121 stream,
122 store,
123 path,
124 threshold,
125 concurrency,
126 format.compression_type,
127 json::LineDelimitedWriter::new,
128 )
129 .await
130}
131
132impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
133 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
134 self.write(batch).context(error::WriteRecordBatchSnafu)
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::sync::Arc;
141
142 use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
143 use common_recordbatch::{RecordBatch, RecordBatches};
144 use common_test_util::find_workspace_path;
145 use datafusion::datasource::physical_plan::{FileSource, JsonSource};
146 use datatypes::prelude::ConcreteDataType;
147 use datatypes::schema::{ColumnSchema, Schema};
148 use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
149 use futures::TryStreamExt;
150
151 use super::*;
152 use crate::file_format::{
153 FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
154 };
155 use crate::test_util::{format_schema, test_store};
156
157 fn test_data_root() -> String {
158 find_workspace_path("/src/common/datasource/tests/json")
159 .display()
160 .to_string()
161 }
162
163 #[tokio::test]
164 async fn infer_schema_basic() {
165 let json = JsonFormat::default();
166 let store = test_store(&test_data_root());
167 let schema = json.infer_schema(&store, "simple.json").await.unwrap();
168 let formatted: Vec<_> = format_schema(schema);
169
170 assert_eq!(
171 vec![
172 "a: Int64: NULL",
173 "b: Float64: NULL",
174 "c: Boolean: NULL",
175 "d: Utf8: NULL",
176 ],
177 formatted
178 );
179 }
180
181 #[tokio::test]
182 async fn infer_schema_with_limit() {
183 let json = JsonFormat {
184 schema_infer_max_record: Some(3),
185 ..JsonFormat::default()
186 };
187 let store = test_store(&test_data_root());
188 let schema = json
189 .infer_schema(&store, "schema_infer_limit.json")
190 .await
191 .unwrap();
192 let formatted: Vec<_> = format_schema(schema);
193
194 assert_eq!(
195 vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"],
196 formatted
197 );
198 }
199
200 #[test]
201 fn test_try_from() {
202 let map = HashMap::new();
203 let format = JsonFormat::try_from(&map).unwrap();
204
205 assert_eq!(format, JsonFormat::default());
206
207 let map = HashMap::from([
208 (
209 FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
210 "2000".to_string(),
211 ),
212 (FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
213 ]);
214 let format = JsonFormat::try_from(&map).unwrap();
215
216 assert_eq!(
217 format,
218 JsonFormat {
219 compression_type: CompressionType::Zstd,
220 schema_infer_max_record: Some(2000),
221 }
222 );
223 }
224
225 #[tokio::test]
226 async fn test_compressed_json() {
227 let column_schemas = vec![
229 ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
230 ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
231 ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
232 ];
233 let schema = Arc::new(Schema::new(column_schemas));
234
235 let batch1_columns: Vec<VectorRef> = vec![
237 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
238 Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
239 Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
240 ];
241 let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
242
243 let batch2_columns: Vec<VectorRef> = vec![
244 Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
245 Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
246 Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
247 ];
248 let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
249
250 let batch3_columns: Vec<VectorRef> = vec![
251 Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
252 Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
253 Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
254 ];
255 let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
256
257 let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
259
260 let compression_types = vec![
262 CompressionType::Gzip,
263 CompressionType::Bzip2,
264 CompressionType::Xz,
265 CompressionType::Zstd,
266 ];
267
268 let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json");
270 for compression_type in compression_types {
271 let format = JsonFormat {
272 compression_type,
273 ..JsonFormat::default()
274 };
275
276 let compressed_file_name =
277 format!("test_compressed_json.{}", compression_type.file_extension());
278 let compressed_file_path = temp_dir.path().join(&compressed_file_name);
279 let compressed_file_path_str = compressed_file_path.to_str().unwrap();
280
281 let store = test_store("/");
283
284 let rows = stream_to_json(
286 Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
287 store,
288 compressed_file_path_str,
289 1024,
290 1,
291 &format,
292 )
293 .await
294 .unwrap();
295
296 assert_eq!(rows, 9);
297
298 assert!(compressed_file_path.exists());
300 let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
301 assert!(file_size > 0);
302
303 let file_content = std::fs::read(&compressed_file_path).unwrap();
305 match compression_type {
308 CompressionType::Gzip => {
309 assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
311 assert_eq!(
312 file_content[1], 0x8b,
313 "Gzip file should have 0x8b as second byte"
314 );
315 }
316 CompressionType::Bzip2 => {
317 assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
319 assert_eq!(
320 file_content[1], b'Z',
321 "Bzip2 file should have 'Z' as second byte"
322 );
323 }
324 CompressionType::Xz => {
325 assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
327 }
328 CompressionType::Zstd => {
329 assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
331 assert_eq!(
332 file_content[1], 0xB5,
333 "Zstd file should have 0xB5 as second byte"
334 );
335 }
336 _ => {}
337 }
338
339 let store = test_store("/");
341 let schema = Arc::new(
342 JsonFormat {
343 compression_type,
344 ..Default::default()
345 }
346 .infer_schema(&store, compressed_file_path_str)
347 .await
348 .unwrap(),
349 );
350 let json_source = JsonSource::new()
351 .with_schema(schema.clone())
352 .with_batch_size(8192);
353
354 let stream = file_to_stream(
355 &store,
356 compressed_file_path_str,
357 schema.clone(),
358 json_source.clone(),
359 None,
360 compression_type,
361 )
362 .await
363 .unwrap();
364
365 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
366 let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
367 .unwrap()
368 .to_string();
369 let expected = r#"+----+---------+-------+
370| id | name | value |
371+----+---------+-------+
372| 1 | Alice | 10.5 |
373| 2 | Bob | 20.3 |
374| 3 | Charlie | 30.7 |
375| 4 | David | 40.1 |
376| 5 | Eva | 50.2 |
377| 6 | Frank | 60.3 |
378| 7 | Grace | 70.4 |
379| 8 | Henry | 80.5 |
380| 9 | Ivy | 90.6 |
381+----+---------+-------+"#;
382 assert_eq!(expected, pretty_print);
383 }
384 }
385}