1use std::collections::{BTreeMap, HashMap};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use arrow::datatypes::DataType as ArrowDataType;
20use arrow_schema::Fields;
21use common_base::bytes::Bytes;
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24
25use crate::data_type::DataType;
26use crate::error::{
27 DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
28};
29use crate::prelude::ConcreteDataType;
30use crate::scalars::ScalarVectorBuilder;
31use crate::type_id::LogicalTypeId;
32use crate::types::{ListType, StructField, StructType};
33use crate::value::Value;
34use crate::vectors::json::builder::JsonVectorBuilder;
35use crate::vectors::{BinaryVectorBuilder, MutableVector};
36
37pub const JSON_TYPE_NAME: &str = "Json";
38const JSON_PLAIN_FIELD_NAME: &str = "__plain__";
39const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
42pub enum JsonFormat {
43 #[default]
44 Jsonb,
45 Native(Box<ConcreteDataType>),
46}
47
48#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
51pub struct JsonType {
52 pub format: JsonFormat,
53}
54
55impl JsonType {
56 pub fn new(format: JsonFormat) -> Self {
57 Self { format }
58 }
59
60 pub(crate) fn empty() -> Self {
61 Self {
62 format: JsonFormat::Native(Box::new(ConcreteDataType::null_datatype())),
63 }
64 }
65
66 pub(crate) fn as_struct_type(&self) -> StructType {
72 match &self.format {
73 JsonFormat::Jsonb => StructType::default(),
74 JsonFormat::Native(inner) => match inner.as_ref() {
75 ConcreteDataType::Struct(t) => t.clone(),
76 x => {
77 let mut field =
78 StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), x.clone(), true);
79 field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true);
80 StructType::new(Arc::new(vec![field]))
81 }
82 },
83 }
84 }
85
86 pub(crate) fn is_plain_json(&self) -> bool {
89 let JsonFormat::Native(box ConcreteDataType::Struct(t)) = &self.format else {
90 return true;
91 };
92 let fields = t.fields();
93 let Some((single, [])) = fields.split_first() else {
94 return false;
95 };
96 single.name() == JSON_PLAIN_FIELD_NAME
97 && single.metadata(JSON_PLAIN_FIELD_METADATA_KEY) == Some("true")
98 }
99
100 pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
102 match (&self.format, &other.format) {
103 (JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
104 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
105 let merged = merge(this.as_ref(), that.as_ref())?;
106 self.format = JsonFormat::Native(Box::new(merged));
107 Ok(())
108 }
109 _ => MergeJsonDatatypeSnafu {
110 reason: "json format not match",
111 }
112 .fail(),
113 }
114 }
115
116 pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool {
117 match (&self.format, &other.format) {
118 (JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
119 (JsonFormat::Native(this), JsonFormat::Native(that)) => {
120 is_mergeable(this.as_ref(), that.as_ref())
121 }
122 _ => false,
123 }
124 }
125}
126
127fn is_mergeable(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
128 fn is_mergeable_struct(this: &StructType, that: &StructType) -> bool {
129 let this_fields = this.fields();
130 let this_fields = this_fields
131 .iter()
132 .map(|x| (x.name(), x))
133 .collect::<HashMap<_, _>>();
134
135 for that_field in that.fields().iter() {
136 if let Some(this_field) = this_fields.get(that_field.name())
137 && !is_mergeable(this_field.data_type(), that_field.data_type())
138 {
139 return false;
140 }
141 }
142 true
143 }
144
145 match (this, that) {
146 (this, that) if this == that => true,
147 (ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
148 is_mergeable(this.item_type(), that.item_type())
149 }
150 (ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
151 is_mergeable_struct(this, that)
152 }
153 (ConcreteDataType::Null(_), _) | (_, ConcreteDataType::Null(_)) => true,
154 _ => false,
155 }
156}
157
158fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
159 match (this, that) {
160 (this, that) if this == that => Ok(this.clone()),
161 (ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
162 merge_list(this, that).map(ConcreteDataType::List)
163 }
164 (ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
165 merge_struct(this, that).map(ConcreteDataType::Struct)
166 }
167 (ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()),
168 _ => MergeJsonDatatypeSnafu {
169 reason: format!("datatypes have conflict, this: {this}, that: {that}"),
170 }
171 .fail(),
172 }
173}
174
175fn merge_list(this: &ListType, that: &ListType) -> Result<ListType> {
176 let merged = merge(this.item_type(), that.item_type())?;
177 Ok(ListType::new(Arc::new(merged)))
178}
179
180fn merge_struct(this: &StructType, that: &StructType) -> Result<StructType> {
181 let this = Arc::unwrap_or_clone(this.fields());
182 let that = Arc::unwrap_or_clone(that.fields());
183
184 let mut this: BTreeMap<String, StructField> = this
185 .into_iter()
186 .map(|x| (x.name().to_string(), x))
187 .collect();
188 for that_field in that {
190 let field_name = that_field.name().to_string();
191 if let Some(this_field) = this.get(&field_name) {
192 let merged_field = StructField::new(
193 field_name.clone(),
194 merge(this_field.data_type(), that_field.data_type())?,
195 true, );
197 this.insert(field_name, merged_field);
198 } else {
199 this.insert(field_name, that_field);
200 }
201 }
202
203 let fields = this.into_values().collect::<Vec<_>>();
204 Ok(StructType::new(Arc::new(fields)))
205}
206
207impl DataType for JsonType {
208 fn name(&self) -> String {
209 match &self.format {
210 JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
211 JsonFormat::Native(x) => format!("Json<{x}>"),
212 }
213 }
214
215 fn logical_type_id(&self) -> LogicalTypeId {
216 LogicalTypeId::Json
217 }
218
219 fn default_value(&self) -> Value {
220 Bytes::default().into()
221 }
222
223 fn as_arrow_type(&self) -> ArrowDataType {
224 match self.format {
225 JsonFormat::Jsonb => ArrowDataType::Binary,
226 JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()),
227 }
228 }
229
230 fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
231 match self.format {
232 JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
233 JsonFormat::Native(_) => Box::new(JsonVectorBuilder::with_capacity(capacity)),
234 }
235 }
236
237 fn try_cast(&self, from: Value) -> Option<Value> {
238 match from {
239 Value::Binary(v) => Some(Value::Binary(v)),
240 _ => None,
241 }
242 }
243}
244
245pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
247 match jsonb::from_slice(val) {
248 Ok(jsonb_value) => {
249 let serialized = jsonb_value.to_string();
250 Ok(serialized)
251 }
252 Err(e) => InvalidJsonbSnafu { error: e }.fail(),
253 }
254}
255
256pub fn jsonb_to_serde_json(val: &[u8]) -> Result<serde_json::Value> {
258 let json_string = jsonb_to_string(val)?;
259 serde_json::Value::from_str(json_string.as_str())
260 .context(DeserializeSnafu { json: json_string })
261}
262
263pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
265 jsonb::parse_value(s.as_bytes())
266 .map_err(|_| InvalidJsonSnafu { value: s }.build())
267 .map(|json| json.to_vec())
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::json::JsonStructureSettings;
274
275 #[test]
276 fn test_merge_json_type() -> Result<()> {
277 fn test(
278 json: &str,
279 json_type: &mut JsonType,
280 expected: std::result::Result<&str, &str>,
281 ) -> Result<()> {
282 let json: serde_json::Value = serde_json::from_str(json).unwrap();
283
284 let settings = JsonStructureSettings::Structured(None);
285 let value = settings.encode(json)?;
286 let value_type = value.data_type();
287 let Some(other) = value_type.as_json() else {
288 unreachable!()
289 };
290
291 let result = json_type.merge(other);
292 match (result, expected) {
293 (Ok(()), Ok(expected)) => {
294 assert_eq!(json_type.name(), expected);
295 assert!(json_type.is_mergeable(other));
296 }
297 (Err(err), Err(expected)) => {
298 assert_eq!(err.to_string(), expected);
299 assert!(!json_type.is_mergeable(other));
300 }
301 _ => unreachable!(),
302 }
303 Ok(())
304 }
305
306 let json_type = &mut JsonType::new(JsonFormat::Native(Box::new(
307 ConcreteDataType::null_datatype(),
308 )));
309
310 let json = r#"{
312 "hello": "world",
313 "list": [1, 2, 3],
314 "object": {"a": 1}
315 }"#;
316 let expected =
317 r#"Json<Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
318 test(json, json_type, Ok(expected))?;
319
320 let jsons = [r#""s""#, "1", "[1]"];
322 let expects = [
323 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: String"#,
324 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: Int64"#,
325 r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: List<Int64>"#,
326 ];
327 for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
328 test(json, json_type, Err(expect))?;
329 }
330
331 let json = r#"{
333 "hello": 1,
334 "float": 0.123,
335 "no": 42
336 }"#;
337 let expected =
338 r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#;
339 test(json, json_type, Err(expected))?;
340
341 let json = r#"{
343 "hello": "greptime",
344 "float": 0.123,
345 "int": 42
346 }"#;
347 let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
348 test(json, json_type, Ok(expected))?;
349
350 let json = r#"{
352 "list": [4],
353 "object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}},
354 "float": 0.456,
355 "int": 0
356 }"#;
357 let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64, "foo": String, "l": List<String>, "o": Struct<"key": String>>>>"#;
358 test(json, json_type, Ok(expected))?;
359
360 Ok(())
361 }
362}