1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::LazyLock;
18
19use crate::data_type::ConcreteDataType;
20use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
21use crate::json::value::JsonValueRef;
22use crate::prelude::{ValueRef, Vector, VectorRef};
23use crate::types::json_type::JsonNativeType;
24use crate::types::{JsonType, json_type};
25use crate::value::StructValueRef;
26use crate::vectors::{MutableVector, StructVectorBuilder};
27
28struct JsonStructsBuilder {
29 json_type: JsonType,
30 inner: StructVectorBuilder,
31}
32
33impl JsonStructsBuilder {
34 fn new(json_type: JsonType, capacity: usize) -> Self {
35 let struct_type = json_type.as_struct_type();
36 let inner = StructVectorBuilder::with_type_and_capacity(struct_type, capacity);
37 Self { json_type, inner }
38 }
39
40 fn len(&self) -> usize {
41 self.inner.len()
42 }
43
44 fn push(&mut self, json: &JsonValueRef) -> Result<()> {
45 let mut value = json.as_value_ref();
46 if !json.is_object() {
47 let fields = json_type::plain_json_struct_type(value.data_type());
48 value = ValueRef::Struct(StructValueRef::RefList {
49 val: vec![value],
50 fields,
51 })
52 }
53 self.inner.try_push_value_ref(&value)
54 }
55
56 fn try_merge(&mut self, other: &mut JsonStructsBuilder) -> Result<()> {
61 debug_assert!(self.json_type.is_mergeable(&other.json_type));
62
63 fn helper(this: &mut StructVectorBuilder, that: &mut StructVectorBuilder) -> Result<()> {
64 let that_len = that.len();
65 if let Some(x) = that.mut_null_buffer().finish() {
66 this.mut_null_buffer().append_buffer(&x)
67 } else {
68 this.mut_null_buffer().append_n_non_nulls(that_len);
69 }
70
71 let that_fields = that.struct_type().fields();
72 let mut that_builders = that_fields
73 .iter()
74 .zip(that.mut_value_builders().iter_mut())
75 .map(|(field, builder)| (field.name(), builder))
76 .collect::<HashMap<_, _>>();
77
78 for (field, this_builder) in this
79 .struct_type()
80 .fields()
81 .iter()
82 .zip(this.mut_value_builders().iter_mut())
83 {
84 if let Some(that_builder) = that_builders.get_mut(field.name()) {
85 if field.data_type().is_struct() {
86 let this = this_builder
87 .as_mut_any()
88 .downcast_mut::<StructVectorBuilder>()
89 .unwrap();
91
92 let that = that_builder
93 .as_mut_any()
94 .downcast_mut::<StructVectorBuilder>()
95 .unwrap();
98 helper(this, that)?;
99 } else {
100 let vector = that_builder.to_vector();
101 this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
102 }
103 } else {
104 this_builder.push_nulls(that_len);
105 }
106 }
107 Ok(())
108 }
109 helper(&mut self.inner, &mut other.inner)
110 }
111
112 fn try_merge_cloned(&mut self, other: &JsonStructsBuilder) -> Result<()> {
114 debug_assert!(self.json_type.is_mergeable(&other.json_type));
115
116 fn helper(this: &mut StructVectorBuilder, that: &StructVectorBuilder) -> Result<()> {
117 let that_len = that.len();
118 if let Some(x) = that.null_buffer().finish_cloned() {
119 this.mut_null_buffer().append_buffer(&x)
120 } else {
121 this.mut_null_buffer().append_n_non_nulls(that_len);
122 }
123
124 let that_fields = that.struct_type().fields();
125 let that_builders = that_fields
126 .iter()
127 .zip(that.value_builders().iter())
128 .map(|(field, builder)| (field.name(), builder))
129 .collect::<HashMap<_, _>>();
130
131 for (field, this_builder) in this
132 .struct_type()
133 .fields()
134 .iter()
135 .zip(this.mut_value_builders().iter_mut())
136 {
137 if let Some(that_builder) = that_builders.get(field.name()) {
138 if field.data_type().is_struct() {
139 let this = this_builder
140 .as_mut_any()
141 .downcast_mut::<StructVectorBuilder>()
142 .unwrap();
144
145 let that = that_builder
146 .as_any()
147 .downcast_ref::<StructVectorBuilder>()
148 .unwrap();
151 helper(this, that)?;
152 } else {
153 let vector = that_builder.to_vector_cloned();
154 this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
155 }
156 } else {
157 this_builder.push_nulls(that_len);
158 }
159 }
160 Ok(())
161 }
162 helper(&mut self.inner, &other.inner)
163 }
164}
165
166pub(crate) struct JsonVectorBuilder {
179 merged_type: JsonType,
180 capacity: usize,
181 builders: Vec<JsonStructsBuilder>,
182}
183
184impl JsonVectorBuilder {
185 pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self {
186 Self {
187 merged_type: JsonType::new_native(json_type),
188 capacity,
189 builders: vec![],
190 }
191 }
192
193 fn try_create_new_builder(&mut self, json_type: &JsonType) -> Result<&mut JsonStructsBuilder> {
194 self.merged_type.merge(json_type)?;
195
196 let builder = JsonStructsBuilder::new(json_type.clone(), self.capacity);
197 self.builders.push(builder);
198
199 let len = self.builders.len();
200 Ok(&mut self.builders[len - 1])
201 }
202}
203
204impl MutableVector for JsonVectorBuilder {
205 fn data_type(&self) -> ConcreteDataType {
206 ConcreteDataType::Json(self.merged_type.clone())
207 }
208
209 fn len(&self) -> usize {
210 self.builders.iter().map(|x| x.len()).sum()
211 }
212
213 fn as_any(&self) -> &dyn Any {
214 self
215 }
216
217 fn as_mut_any(&mut self) -> &mut dyn Any {
218 self
219 }
220
221 fn to_vector(&mut self) -> VectorRef {
222 if self.builders.len() == 1 {
224 return self.builders[0].inner.to_vector();
225 }
226
227 let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
228 for builder in self.builders.iter_mut() {
229 unified_jsons
230 .try_merge(builder)
231 .unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
234 }
235 unified_jsons.inner.to_vector()
236 }
237
238 fn to_vector_cloned(&self) -> VectorRef {
239 if self.builders.len() == 1 {
241 return self.builders[0].inner.to_vector_cloned();
242 }
243
244 let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
245 for builder in self.builders.iter() {
246 unified_jsons
247 .try_merge_cloned(builder)
248 .unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
251 }
252 unified_jsons.inner.to_vector_cloned()
253 }
254
255 fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
256 let ValueRef::Json(value) = value else {
257 return TryFromValueSnafu {
258 reason: format!("expected json value, got {value:?}"),
259 }
260 .fail();
261 };
262 let json_type = value.json_type();
263
264 let builder = match self.builders.last_mut() {
265 Some(last) => {
266 if &last.json_type != json_type {
268 self.try_create_new_builder(json_type)?
269 } else {
270 last
271 }
272 }
273 None => self.try_create_new_builder(json_type)?,
274 };
275
276 builder.push(value.as_ref())
277 }
278
279 fn push_null(&mut self) {
280 static NULL_JSON: LazyLock<ValueRef> =
281 LazyLock::new(|| ValueRef::Json(Box::new(JsonValueRef::null())));
282 self.try_push_value_ref(&NULL_JSON)
283 .unwrap_or_else(|e| panic!("failed to push null json value, error: {e}"));
286 }
287
288 fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> {
289 UnsupportedOperationSnafu {
290 op: "extend_slice_of",
291 vector_type: "JsonVector",
292 }
293 .fail()
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::data_type::DataType;
301 use crate::json::JsonStructureSettings;
302 use crate::vectors::helper::pretty_print;
303
304 fn push(json: &str, builder: &mut JsonVectorBuilder, expected: std::result::Result<(), &str>) {
305 let settings = JsonStructureSettings::Structured(None);
306 let json: serde_json::Value = serde_json::from_str(json).unwrap();
307 let value = settings.encode(json).unwrap();
308
309 let value = value.as_value_ref();
310 let result = builder
311 .try_push_value_ref(&value)
312 .map_err(|e| e.to_string());
313 let expected = expected.map_err(|e| e.to_string());
314 assert_eq!(result, expected);
315 }
316
317 #[test]
318 fn test_push_plain_jsons() -> Result<()> {
319 let jsons = vec!["1", "2", r#""s""#, "[true]"];
320 let results = vec![
321 Ok(()),
322 Ok(()),
323 Err(
324 "Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
325 ),
326 Err(
327 "Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
328 ),
329 ];
330 let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
331 for (json, result) in jsons.into_iter().zip(results.into_iter()) {
332 push(json, &mut builder, result);
333 }
334 let vector = builder.to_vector();
335 let expected = r#"
336+---------------------+
337| StructVector |
338+---------------------+
339| {__json_plain__: 1} |
340| {__json_plain__: 2} |
341+---------------------+"#;
342 assert_eq!(pretty_print(vector), expected.trim());
343 Ok(())
344 }
345
346 #[test]
347 fn test_push_json_objects() -> Result<()> {
348 let jsons = vec![
349 r#"{
350 "s": "a",
351 "list": [1, 2, 3]
352 }"#,
353 r#"{
354 "list": [4],
355 "s": "b"
356 }"#,
357 r#"{
358 "s": "c",
359 "float": 0.9
360 }"#,
361 r#"{
362 "float": 0.8,
363 "s": "d"
364 }"#,
365 r#"{
366 "float": 0.7,
367 "int": -1
368 }"#,
369 r#"{
370 "int": 0,
371 "float": 0.6
372 }"#,
373 r#"{
374 "int": 1,
375 "object": {"hello": "world", "timestamp": 1761523200000}
376 }"#,
377 r#"{
378 "object": {"hello": "greptime", "timestamp": 1761523201000},
379 "int": 2
380 }"#,
381 r#"{
382 "object": {"timestamp": 1761523202000},
383 "nested": {"a": {"b": {"b": {"a": "abba"}}}}
384 }"#,
385 r#"{
386 "nested": {"a": {"b": {"a": {"b": "abab"}}}},
387 "object": {"timestamp": 1761523203000}
388 }"#,
389 ];
390 let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
391 for json in jsons {
392 push(json, &mut builder, Ok(()));
393 }
394 assert_eq!(builder.len(), 10);
395
396 assert_eq!(builder.builders.len(), 6);
398 let expect_types = [
399 r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
400 r#"Json<Object{"float": Number(F64), "s": String}>"#,
401 r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
402 r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
403 r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
404 r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
405 ];
406 let expect_vectors = [
407 r#"
408+-------------------------+
409| StructVector |
410+-------------------------+
411| {list: [1, 2, 3], s: a} |
412| {list: [4], s: b} |
413+-------------------------+"#,
414 r#"
415+--------------------+
416| StructVector |
417+--------------------+
418| {float: 0.9, s: c} |
419| {float: 0.8, s: d} |
420+--------------------+"#,
421 r#"
422+-----------------------+
423| StructVector |
424+-----------------------+
425| {float: 0.7, int: -1} |
426| {float: 0.6, int: 0} |
427+-----------------------+"#,
428 r#"
429+---------------------------------------------------------------+
430| StructVector |
431+---------------------------------------------------------------+
432| {int: 1, object: {hello: world, timestamp: 1761523200000}} |
433| {int: 2, object: {hello: greptime, timestamp: 1761523201000}} |
434+---------------------------------------------------------------+"#,
435 r#"
436+------------------------------------------------------------------------+
437| StructVector |
438+------------------------------------------------------------------------+
439| {nested: {a: {b: {b: {a: abba}}}}, object: {timestamp: 1761523202000}} |
440+------------------------------------------------------------------------+"#,
441 r#"
442+------------------------------------------------------------------------+
443| StructVector |
444+------------------------------------------------------------------------+
445| {nested: {a: {b: {a: {b: abab}}}}, object: {timestamp: 1761523203000}} |
446+------------------------------------------------------------------------+"#,
447 ];
448 for (builder, (expect_type, expect_vector)) in builder
449 .builders
450 .iter()
451 .zip(expect_types.into_iter().zip(expect_vectors.into_iter()))
452 {
453 assert_eq!(builder.json_type.name(), expect_type);
454 let vector = builder.inner.to_vector_cloned();
455 assert_eq!(pretty_print(vector), expect_vector.trim());
456 }
457
458 let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
460 assert_eq!(builder.data_type().to_string(), expected);
461
462 let expected = r#"
464+-------------------------------------------------------------------------------------------------------------------+
465| StructVector |
466+-------------------------------------------------------------------------------------------------------------------+
467| {float: , int: , list: [1, 2, 3], nested: , object: , s: a} |
468| {float: , int: , list: [4], nested: , object: , s: b} |
469| {float: 0.9, int: , list: , nested: , object: , s: c} |
470| {float: 0.8, int: , list: , nested: , object: , s: d} |
471| {float: 0.7, int: -1, list: , nested: , object: , s: } |
472| {float: 0.6, int: 0, list: , nested: , object: , s: } |
473| {float: , int: 1, list: , nested: , object: {hello: world, timestamp: 1761523200000}, s: } |
474| {float: , int: 2, list: , nested: , object: {hello: greptime, timestamp: 1761523201000}, s: } |
475| {float: , int: , list: , nested: {a: {b: {a: , b: {a: abba}}}}, object: {hello: , timestamp: 1761523202000}, s: } |
476| {float: , int: , list: , nested: {a: {b: {a: {b: abab}, b: }}}, object: {hello: , timestamp: 1761523203000}, s: } |
477+-------------------------------------------------------------------------------------------------------------------+"#;
478 let vector = builder.to_vector_cloned();
479 assert_eq!(pretty_print(vector), expected.trim());
480 let vector = builder.to_vector();
481 assert_eq!(pretty_print(vector), expected.trim());
482 Ok(())
483 }
484}