1use std::collections::BTreeMap;
16use std::fmt::{Display, Formatter};
17use std::str::FromStr;
18use std::sync::Arc;
19
20use arrow::datatypes::DataType as ArrowDataType;
21use common_base::bytes::Bytes;
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24
25use crate::data_type::DataType;
26use crate::error::{
27 DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
28};
29use crate::prelude::ConcreteDataType;
30use crate::scalars::ScalarVectorBuilder;
31use crate::type_id::LogicalTypeId;
32use crate::types::{ListType, StructField, StructType};
33use crate::value::Value;
34use crate::vectors::json::builder::JsonVectorBuilder;
35use crate::vectors::{BinaryVectorBuilder, MutableVector};
36
37pub const JSON_TYPE_NAME: &str = "Json";
38const JSON_PLAIN_FIELD_NAME: &str = "__json_plain__";
39const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
40
41pub type JsonObjectType = BTreeMap<String, JsonNativeType>;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
44pub enum JsonNumberType {
45 U64,
46 I64,
47 F64,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
51pub enum JsonNativeType {
52 Null,
53 Bool,
54 Number(JsonNumberType),
55 String,
56 Array(Box<JsonNativeType>),
57 Object(JsonObjectType),
58}
59
60impl JsonNativeType {
61 pub fn is_null(&self) -> bool {
62 matches!(self, JsonNativeType::Null)
63 }
64
65 pub fn u64() -> Self {
66 Self::Number(JsonNumberType::U64)
67 }
68
69 pub fn i64() -> Self {
70 Self::Number(JsonNumberType::I64)
71 }
72
73 pub fn f64() -> Self {
74 Self::Number(JsonNumberType::F64)
75 }
76}
77
78impl From<&JsonNativeType> for ConcreteDataType {
79 fn from(value: &JsonNativeType) -> Self {
80 match value {
81 JsonNativeType::Null => ConcreteDataType::null_datatype(),
82 JsonNativeType::Bool => ConcreteDataType::boolean_datatype(),
83 JsonNativeType::Number(JsonNumberType::U64) => ConcreteDataType::uint64_datatype(),
84 JsonNativeType::Number(JsonNumberType::I64) => ConcreteDataType::int64_datatype(),
85 JsonNativeType::Number(JsonNumberType::F64) => ConcreteDataType::float64_datatype(),
86 JsonNativeType::String => ConcreteDataType::string_datatype(),
87 JsonNativeType::Array(item_type) => {
88 ConcreteDataType::List(ListType::new(Arc::new(item_type.as_ref().into())))
89 }
90 JsonNativeType::Object(object) => {
91 let fields = object
92 .iter()
93 .map(|(type_name, field_type)| {
94 StructField::new(type_name.clone(), field_type.into(), true)
95 })
96 .collect();
97 ConcreteDataType::Struct(StructType::new(Arc::new(fields)))
98 }
99 }
100 }
101}
102
103impl From<&ConcreteDataType> for JsonNativeType {
104 fn from(value: &ConcreteDataType) -> Self {
105 match value {
106 ConcreteDataType::Null(_) => JsonNativeType::Null,
107 ConcreteDataType::Boolean(_) => JsonNativeType::Bool,
108 ConcreteDataType::UInt64(_)
109 | ConcreteDataType::UInt32(_)
110 | ConcreteDataType::UInt16(_)
111 | ConcreteDataType::UInt8(_) => JsonNativeType::u64(),
112 ConcreteDataType::Int64(_)
113 | ConcreteDataType::Int32(_)
114 | ConcreteDataType::Int16(_)
115 | ConcreteDataType::Int8(_) => JsonNativeType::i64(),
116 ConcreteDataType::Float64(_) | ConcreteDataType::Float32(_) => JsonNativeType::f64(),
117 ConcreteDataType::String(_) => JsonNativeType::String,
118 ConcreteDataType::List(list_type) => {
119 JsonNativeType::Array(Box::new(list_type.item_type().into()))
120 }
121 ConcreteDataType::Struct(struct_type) => JsonNativeType::Object(
122 struct_type
123 .fields()
124 .iter()
125 .map(|field| (field.name().to_string(), field.data_type().into()))
126 .collect(),
127 ),
128 ConcreteDataType::Json(json_type) => json_type.native_type().clone(),
129 _ => unreachable!(),
130 }
131 }
132}
133
134impl Display for JsonNativeType {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 match self {
137 JsonNativeType::Null => write!(f, "Null"),
138 JsonNativeType::Bool => write!(f, "Bool"),
139 JsonNativeType::Number(t) => {
140 write!(f, "Number({t:?})")
141 }
142 JsonNativeType::String => write!(f, "String"),
143 JsonNativeType::Array(item_type) => {
144 write!(f, "Array[{}]", item_type)
145 }
146 JsonNativeType::Object(object) => {
147 write!(
148 f,
149 "Object{{{}}}",
150 object
151 .iter()
152 .map(|(k, v)| format!(r#""{k}": {v}"#))
153 .collect::<Vec<_>>()
154 .join(", ")
155 )
156 }
157 }
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
162pub enum JsonFormat {
163 #[default]
164 Jsonb,
165 Native(Box<JsonNativeType>),
166}
167
168#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
171pub struct JsonType {
172 pub format: JsonFormat,
173}
174
175impl JsonType {
176 pub fn new(format: JsonFormat) -> Self {
177 Self { format }
178 }
179
180 pub(crate) fn new_native(native: JsonNativeType) -> Self {
181 Self {
182 format: JsonFormat::Native(Box::new(native)),
183 }
184 }
185
186 pub(crate) fn native_type(&self) -> &JsonNativeType {
187 match &self.format {
188 JsonFormat::Jsonb => &JsonNativeType::String,
189 JsonFormat::Native(x) => x.as_ref(),
190 }
191 }
192
193 pub fn null() -> Self {
194 Self {
195 format: JsonFormat::Native(Box::new(JsonNativeType::Null)),
196 }
197 }
198
199 pub(crate) fn as_struct_type(&self) -> StructType {
204 match &self.format {
205 JsonFormat::Jsonb => StructType::default(),
206 JsonFormat::Native(inner) => match ConcreteDataType::from(inner.as_ref()) {
207 ConcreteDataType::Struct(t) => t.clone(),
208 x => plain_json_struct_type(x),
209 },
210 }
211 }
212
213 pub fn merge(&mut self, other: &JsonType) -> Result<()> {
215 match (&self.format, &other.format) {
216 (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
217 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
218 let merged = merge(this.as_ref(), that.as_ref())?;
219 self.format = JsonFormat::Native(Box::new(merged));
220 Ok(())
221 }
222 _ => MergeJsonDatatypeSnafu {
223 reason: "json format not match",
224 }
225 .fail(),
226 }
227 }
228
229 pub fn is_mergeable(&self, other: &JsonType) -> bool {
231 match (&self.format, &other.format) {
232 (JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
233 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
234 is_mergeable(this.as_ref(), that.as_ref())
235 }
236 _ => false,
237 }
238 }
239
240 pub fn is_include(&self, other: &JsonType) -> bool {
242 match (&self.format, &other.format) {
243 (JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
244 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
245 is_include(this.as_ref(), that.as_ref())
246 }
247 _ => false,
248 }
249 }
250}
251
252fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool {
253 fn is_include_object(this: &JsonObjectType, that: &JsonObjectType) -> bool {
254 for (type_name, that_type) in that {
255 let Some(this_type) = this.get(type_name) else {
256 return false;
257 };
258 if !is_include(this_type, that_type) {
259 return false;
260 }
261 }
262 true
263 }
264
265 match (this, that) {
266 (this, that) if this == that => true,
267 (JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
268 is_include(this.as_ref(), that.as_ref())
269 }
270 (JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
271 is_include_object(this, that)
272 }
273 (_, JsonNativeType::Null) => true,
274 _ => false,
275 }
276}
277
278pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType {
281 let mut field = StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), item_type, true);
282 field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true);
283 StructType::new(Arc::new(vec![field]))
284}
285
286fn is_mergeable(this: &JsonNativeType, that: &JsonNativeType) -> bool {
287 fn is_mergeable_object(this: &JsonObjectType, that: &JsonObjectType) -> bool {
288 for (type_name, that_type) in that {
289 if let Some(this_type) = this.get(type_name)
290 && !is_mergeable(this_type, that_type)
291 {
292 return false;
293 }
294 }
295 true
296 }
297
298 match (this, that) {
299 (this, that) if this == that => true,
300 (JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
301 is_mergeable(this.as_ref(), that.as_ref())
302 }
303 (JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
304 is_mergeable_object(this, that)
305 }
306 (JsonNativeType::Null, _) | (_, JsonNativeType::Null) => true,
307 _ => false,
308 }
309}
310
311fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result<JsonNativeType> {
312 fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> Result<JsonObjectType> {
313 let mut this = this.clone();
314 for (type_name, that_type) in that {
316 if let Some(this_type) = this.get_mut(type_name) {
317 let merged_type = merge(this_type, that_type)?;
318 *this_type = merged_type;
319 } else {
320 this.insert(type_name.clone(), that_type.clone());
321 }
322 }
323 Ok(this)
324 }
325
326 match (this, that) {
327 (this, that) if this == that => Ok(this.clone()),
328 (JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
329 merge(this.as_ref(), that.as_ref()).map(|x| JsonNativeType::Array(Box::new(x)))
330 }
331 (JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
332 merge_object(this, that).map(JsonNativeType::Object)
333 }
334 (JsonNativeType::Null, x) | (x, JsonNativeType::Null) => Ok(x.clone()),
335 _ => MergeJsonDatatypeSnafu {
336 reason: format!("datatypes have conflict, this: {this}, that: {that}"),
337 }
338 .fail(),
339 }
340}
341
342impl DataType for JsonType {
343 fn name(&self) -> String {
344 match &self.format {
345 JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
346 JsonFormat::Native(x) => format!("Json<{x}>"),
347 }
348 }
349
350 fn logical_type_id(&self) -> LogicalTypeId {
351 LogicalTypeId::Json
352 }
353
354 fn default_value(&self) -> Value {
355 Bytes::default().into()
356 }
357
358 fn as_arrow_type(&self) -> ArrowDataType {
359 match self.format {
360 JsonFormat::Jsonb => ArrowDataType::Binary,
361 JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(),
362 }
363 }
364
365 fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
366 match &self.format {
367 JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
368 JsonFormat::Native(x) => Box::new(JsonVectorBuilder::new(*x.clone(), capacity)),
369 }
370 }
371
372 fn try_cast(&self, from: Value) -> Option<Value> {
373 match from {
374 Value::Binary(v) => Some(Value::Binary(v)),
375 _ => None,
376 }
377 }
378}
379
380impl Display for JsonType {
381 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382 write!(f, "{}", self.name())
383 }
384}
385
386pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
388 match jsonb::from_slice(val) {
389 Ok(jsonb_value) => {
390 let serialized = jsonb_value.to_string();
391 Ok(serialized)
392 }
393 Err(e) => InvalidJsonbSnafu { error: e }.fail(),
394 }
395}
396
397pub fn jsonb_to_serde_json(val: &[u8]) -> Result<serde_json::Value> {
399 let json_string = jsonb_to_string(val)?;
400 serde_json::Value::from_str(json_string.as_str())
401 .context(DeserializeSnafu { json: json_string })
402}
403
404pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
406 jsonb::parse_value(s.as_bytes())
407 .map_err(|_| InvalidJsonSnafu { value: s }.build())
408 .map(|json| json.to_vec())
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::json::JsonStructureSettings;
415
416 #[test]
417 fn test_json_type_include() {
418 fn test(this: &JsonNativeType, that: &JsonNativeType, expected: bool) {
419 assert_eq!(is_include(this, that), expected);
420 }
421
422 test(&JsonNativeType::Null, &JsonNativeType::Null, true);
423 test(&JsonNativeType::Null, &JsonNativeType::Bool, false);
424
425 test(&JsonNativeType::Bool, &JsonNativeType::Null, true);
426 test(&JsonNativeType::Bool, &JsonNativeType::Bool, true);
427 test(&JsonNativeType::Bool, &JsonNativeType::u64(), false);
428
429 test(&JsonNativeType::u64(), &JsonNativeType::Null, true);
430 test(&JsonNativeType::u64(), &JsonNativeType::u64(), true);
431 test(&JsonNativeType::u64(), &JsonNativeType::String, false);
432
433 test(&JsonNativeType::String, &JsonNativeType::Null, true);
434 test(&JsonNativeType::String, &JsonNativeType::String, true);
435 test(
436 &JsonNativeType::String,
437 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
438 false,
439 );
440
441 test(
442 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
443 &JsonNativeType::Null,
444 true,
445 );
446 test(
447 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
448 &JsonNativeType::Array(Box::new(JsonNativeType::Null)),
449 true,
450 );
451 test(
452 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
453 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
454 true,
455 );
456 test(
457 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
458 &JsonNativeType::String,
459 false,
460 );
461 test(
462 &JsonNativeType::Array(Box::new(JsonNativeType::f64())),
463 &JsonNativeType::Object(JsonObjectType::new()),
464 false,
465 );
466
467 let simple_json_object = &JsonNativeType::Object(JsonObjectType::from([(
468 "foo".to_string(),
469 JsonNativeType::String,
470 )]));
471 test(simple_json_object, &JsonNativeType::Null, true);
472 test(simple_json_object, simple_json_object, true);
473 test(simple_json_object, &JsonNativeType::i64(), false);
474 test(
475 simple_json_object,
476 &JsonNativeType::Object(JsonObjectType::from([(
477 "bar".to_string(),
478 JsonNativeType::i64(),
479 )])),
480 false,
481 );
482
483 let complex_json_object = &JsonNativeType::Object(JsonObjectType::from([
484 (
485 "nested".to_string(),
486 JsonNativeType::Object(JsonObjectType::from([(
487 "a".to_string(),
488 JsonNativeType::Object(JsonObjectType::from([(
489 "b".to_string(),
490 JsonNativeType::Object(JsonObjectType::from([(
491 "c".to_string(),
492 JsonNativeType::String,
493 )])),
494 )])),
495 )])),
496 ),
497 ("bar".to_string(), JsonNativeType::i64()),
498 ]));
499 test(complex_json_object, &JsonNativeType::Null, true);
500 test(complex_json_object, &JsonNativeType::String, false);
501 test(complex_json_object, complex_json_object, true);
502 test(
503 complex_json_object,
504 &JsonNativeType::Object(JsonObjectType::from([(
505 "bar".to_string(),
506 JsonNativeType::i64(),
507 )])),
508 true,
509 );
510 test(
511 complex_json_object,
512 &JsonNativeType::Object(JsonObjectType::from([
513 (
514 "nested".to_string(),
515 JsonNativeType::Object(JsonObjectType::from([(
516 "a".to_string(),
517 JsonNativeType::Null,
518 )])),
519 ),
520 ("bar".to_string(), JsonNativeType::i64()),
521 ])),
522 true,
523 );
524 test(
525 complex_json_object,
526 &JsonNativeType::Object(JsonObjectType::from([
527 (
528 "nested".to_string(),
529 JsonNativeType::Object(JsonObjectType::from([(
530 "a".to_string(),
531 JsonNativeType::String,
532 )])),
533 ),
534 ("bar".to_string(), JsonNativeType::i64()),
535 ])),
536 false,
537 );
538 test(
539 complex_json_object,
540 &JsonNativeType::Object(JsonObjectType::from([
541 (
542 "nested".to_string(),
543 JsonNativeType::Object(JsonObjectType::from([(
544 "a".to_string(),
545 JsonNativeType::Object(JsonObjectType::from([(
546 "b".to_string(),
547 JsonNativeType::String,
548 )])),
549 )])),
550 ),
551 ("bar".to_string(), JsonNativeType::i64()),
552 ])),
553 false,
554 );
555 test(
556 complex_json_object,
557 &JsonNativeType::Object(JsonObjectType::from([
558 (
559 "nested".to_string(),
560 JsonNativeType::Object(JsonObjectType::from([(
561 "a".to_string(),
562 JsonNativeType::Object(JsonObjectType::from([(
563 "b".to_string(),
564 JsonNativeType::Object(JsonObjectType::from([(
565 "c".to_string(),
566 JsonNativeType::Null,
567 )])),
568 )])),
569 )])),
570 ),
571 ("bar".to_string(), JsonNativeType::i64()),
572 ])),
573 true,
574 );
575 test(
576 complex_json_object,
577 &JsonNativeType::Object(JsonObjectType::from([
578 (
579 "nested".to_string(),
580 JsonNativeType::Object(JsonObjectType::from([(
581 "a".to_string(),
582 JsonNativeType::Object(JsonObjectType::from([(
583 "b".to_string(),
584 JsonNativeType::Object(JsonObjectType::from([(
585 "c".to_string(),
586 JsonNativeType::Bool,
587 )])),
588 )])),
589 )])),
590 ),
591 ("bar".to_string(), JsonNativeType::i64()),
592 ])),
593 false,
594 );
595 test(
596 complex_json_object,
597 &JsonNativeType::Object(JsonObjectType::from([(
598 "nested".to_string(),
599 JsonNativeType::Object(JsonObjectType::from([(
600 "a".to_string(),
601 JsonNativeType::Object(JsonObjectType::from([(
602 "b".to_string(),
603 JsonNativeType::Object(JsonObjectType::from([(
604 "c".to_string(),
605 JsonNativeType::String,
606 )])),
607 )])),
608 )])),
609 )])),
610 true,
611 );
612 }
613
614 #[test]
615 fn test_merge_json_type() -> Result<()> {
616 fn test(
617 json: &str,
618 json_type: &mut JsonType,
619 expected: std::result::Result<&str, &str>,
620 ) -> Result<()> {
621 let json: serde_json::Value = serde_json::from_str(json).unwrap();
622
623 let settings = JsonStructureSettings::Structured(None);
624 let value = settings.encode(json)?;
625 let value_type = value.data_type();
626 let Some(other) = value_type.as_json() else {
627 unreachable!()
628 };
629
630 let result = json_type.merge(other);
631 match (result, expected) {
632 (Ok(()), Ok(expected)) => {
633 assert_eq!(json_type.name(), expected);
634 assert!(json_type.is_mergeable(other));
635 }
636 (Err(err), Err(expected)) => {
637 assert_eq!(err.to_string(), expected);
638 assert!(!json_type.is_mergeable(other));
639 }
640 _ => unreachable!(),
641 }
642 Ok(())
643 }
644
645 let json_type = &mut JsonType::new_native(JsonNativeType::Null);
646
647 let json = r#"{
649 "hello": "world",
650 "list": [1, 2, 3],
651 "object": {"a": 1}
652 }"#;
653 let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
654 test(json, json_type, Ok(expected))?;
655
656 let jsons = [r#""s""#, "1", "[1]"];
658 let expects = [
659 r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
660 r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
661 r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
662 ];
663 for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
664 test(json, json_type, Err(expect))?;
665 }
666
667 let json = r#"{
669 "hello": 1,
670 "float": 0.123,
671 "no": 42
672 }"#;
673 let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
674 test(json, json_type, Err(expected))?;
675
676 let json = r#"{
678 "hello": "greptime",
679 "float": 0.123,
680 "int": 42
681 }"#;
682 let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
683 test(json, json_type, Ok(expected))?;
684
685 let json = r#"{
687 "list": [4],
688 "object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}},
689 "float": 0.456,
690 "int": 0
691 }"#;
692 let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
693 test(json, json_type, Ok(expected))?;
694
695 Ok(())
696 }
697}