1use std::collections::BTreeMap;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use arrow::datatypes::DataType as ArrowDataType;
20use arrow_schema::Fields;
21use common_base::bytes::Bytes;
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24
25use crate::data_type::DataType;
26use crate::error::{
27 DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
28};
29use crate::prelude::ConcreteDataType;
30use crate::scalars::ScalarVectorBuilder;
31use crate::type_id::LogicalTypeId;
32use crate::types::{ListType, StructField, StructType};
33use crate::value::Value;
34use crate::vectors::{BinaryVectorBuilder, MutableVector};
35
36pub const JSON_TYPE_NAME: &str = "Json";
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
39pub enum JsonFormat {
40 #[default]
41 Jsonb,
42 Native(Box<ConcreteDataType>),
43}
44
45#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
48pub struct JsonType {
49 pub format: JsonFormat,
50}
51
52impl JsonType {
53 pub fn new(format: JsonFormat) -> Self {
54 Self { format }
55 }
56
57 #[allow(unused)]
59 pub(crate) fn as_struct_type(&self) -> StructType {
64 match &self.format {
65 JsonFormat::Jsonb => StructType::default(),
66 JsonFormat::Native(inner) => match inner.as_ref() {
67 ConcreteDataType::Struct(t) => t.clone(),
68 x => StructType::new(Arc::new(vec![StructField::new(
69 "__plain".to_string(),
70 x.clone(),
71 true,
72 )])),
73 },
74 }
75 }
76
77 #[allow(unused)]
79 pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
81 match (&self.format, &other.format) {
82 (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
83 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
84 let merged = merge(this.as_ref(), that.as_ref())?;
85 self.format = JsonFormat::Native(Box::new(merged));
86 Ok(())
87 }
88 _ => MergeJsonDatatypeSnafu {
89 reason: "json format not match",
90 }
91 .fail(),
92 }
93 }
94}
95
96fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
97 match (this, that) {
98 (this, that) if this == that => Ok(this.clone()),
99 (ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
100 merge_list(this, that).map(ConcreteDataType::List)
101 }
102 (ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
103 merge_struct(this, that).map(ConcreteDataType::Struct)
104 }
105 (ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()),
106 _ => MergeJsonDatatypeSnafu {
107 reason: format!("datatypes have conflict, this: {this}, that: {that}"),
108 }
109 .fail(),
110 }
111}
112
113fn merge_list(this: &ListType, that: &ListType) -> Result<ListType> {
114 let merged = merge(this.item_type(), that.item_type())?;
115 Ok(ListType::new(Arc::new(merged)))
116}
117
118fn merge_struct(this: &StructType, that: &StructType) -> Result<StructType> {
119 let this = Arc::unwrap_or_clone(this.fields());
120 let that = Arc::unwrap_or_clone(that.fields());
121
122 let mut this: BTreeMap<String, StructField> = this
123 .into_iter()
124 .map(|x| (x.name().to_string(), x))
125 .collect();
126 for that_field in that {
128 let field_name = that_field.name().to_string();
129 if let Some(this_field) = this.get(&field_name) {
130 let merged_field = StructField::new(
131 field_name.clone(),
132 merge(this_field.data_type(), that_field.data_type())?,
133 true, );
135 this.insert(field_name, merged_field);
136 } else {
137 this.insert(field_name, that_field);
138 }
139 }
140
141 let fields = this.into_values().collect::<Vec<_>>();
142 Ok(StructType::new(Arc::new(fields)))
143}
144
145impl DataType for JsonType {
146 fn name(&self) -> String {
147 match &self.format {
148 JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
149 JsonFormat::Native(x) => format!("Json<{x}>"),
150 }
151 }
152
153 fn logical_type_id(&self) -> LogicalTypeId {
154 LogicalTypeId::Json
155 }
156
157 fn default_value(&self) -> Value {
158 Bytes::default().into()
159 }
160
161 fn as_arrow_type(&self) -> ArrowDataType {
162 match self.format {
163 JsonFormat::Jsonb => ArrowDataType::Binary,
164 JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()),
165 }
166 }
167
168 fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
169 Box::new(BinaryVectorBuilder::with_capacity(capacity))
170 }
171
172 fn try_cast(&self, from: Value) -> Option<Value> {
173 match from {
174 Value::Binary(v) => Some(Value::Binary(v)),
175 _ => None,
176 }
177 }
178}
179
180pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
182 match jsonb::from_slice(val) {
183 Ok(jsonb_value) => {
184 let serialized = jsonb_value.to_string();
185 Ok(serialized)
186 }
187 Err(e) => InvalidJsonbSnafu { error: e }.fail(),
188 }
189}
190
191pub fn jsonb_to_serde_json(val: &[u8]) -> Result<serde_json::Value> {
193 let json_string = jsonb_to_string(val)?;
194 serde_json::Value::from_str(json_string.as_str())
195 .context(DeserializeSnafu { json: json_string })
196}
197
198pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
200 jsonb::parse_value(s.as_bytes())
201 .map_err(|_| InvalidJsonSnafu { value: s }.build())
202 .map(|json| json.to_vec())
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::json::JsonStructureSettings;
209
210 #[test]
211 fn test_merge_json_type() -> Result<()> {
212 fn test(
213 json: &str,
214 json_type: &mut JsonType,
215 expected: std::result::Result<&str, &str>,
216 ) -> Result<()> {
217 let json: serde_json::Value = serde_json::from_str(json).unwrap();
218
219 let settings = JsonStructureSettings::Structured(None);
220 let value = settings.encode(json)?;
221 let value_type = value.data_type();
222 let Some(other) = value_type.as_json() else {
223 unreachable!()
224 };
225
226 let result = json_type.merge(other);
227 match (result, expected) {
228 (Ok(()), Ok(expected)) => {
229 assert_eq!(json_type.name(), expected)
230 }
231 (Err(err), Err(expected)) => {
232 assert_eq!(err.to_string(), expected)
233 }
234 _ => unreachable!(),
235 }
236 Ok(())
237 }
238
239 let json_type = &mut JsonType::new(JsonFormat::Native(Box::new(
240 ConcreteDataType::null_datatype(),
241 )));
242
243 let json = r#"{
245 "hello": "world",
246 "list": [1, 2, 3],
247 "object": {"a": 1}
248 }"#;
249 let expected =
250 r#"Json<Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
251 test(json, json_type, Ok(expected))?;
252
253 let jsons = [r#""s""#, "1", "[1]"];
255 let expects = [
256 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: String"#,
257 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: Int64"#,
258 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: List<Int64>"#,
259 ];
260 for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
261 test(json, json_type, Err(expect))?;
262 }
263
264 let json = r#"{
266 "hello": 1,
267 "float": 0.123,
268 "no": 42
269 }"#;
270 let expected =
271 r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#;
272 test(json, json_type, Err(expected))?;
273
274 let json = r#"{
276 "hello": "greptime",
277 "float": 0.123,
278 "int": 42
279 }"#;
280 let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
281 test(json, json_type, Ok(expected))?;
282
283 let json = r#"{
285 "list": [4],
286 "object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}},
287 "float": 0.456,
288 "int": 0
289 }"#;
290 let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64, "foo": String, "l": List<String>, "o": Struct<"key": String>>>>"#;
291 test(json, json_type, Ok(expected))?;
292
293 Ok(())
294 }
295}