1use std::any::Any;
16use std::collections::HashMap;
17
18use snafu::OptionExt;
19
20use crate::data_type::ConcreteDataType;
21use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
22use crate::prelude::{ValueRef, Vector, VectorRef};
23use crate::types::JsonType;
24use crate::value::StructValueRef;
25use crate::vectors::{MutableVector, StructVectorBuilder};
26
27struct JsonStructsBuilder {
28 json_type: JsonType,
29 inner: StructVectorBuilder,
30}
31
32impl JsonStructsBuilder {
33 fn new(json_type: JsonType, capacity: usize) -> Self {
34 let struct_type = json_type.as_struct_type();
35 let inner = StructVectorBuilder::with_type_and_capacity(struct_type, capacity);
36 Self { json_type, inner }
37 }
38
39 fn len(&self) -> usize {
40 self.inner.len()
41 }
42
43 fn push(&mut self, value: &ValueRef) -> Result<()> {
44 if self.json_type.is_plain_json() {
45 let value = ValueRef::Struct(StructValueRef::RefList {
46 val: vec![value.clone()],
47 fields: self.json_type.as_struct_type(),
48 });
49 self.inner.try_push_value_ref(&value)
50 } else {
51 self.inner.try_push_value_ref(value)
52 }
53 }
54
55 fn try_merge(&mut self, other: &mut JsonStructsBuilder) -> Result<()> {
60 debug_assert!(self.json_type.is_mergeable(&other.json_type));
61
62 fn helper(this: &mut StructVectorBuilder, that: &mut StructVectorBuilder) -> Result<()> {
63 let that_len = that.len();
64 if let Some(x) = that.mut_null_buffer().finish() {
65 this.mut_null_buffer().append_buffer(&x)
66 } else {
67 this.mut_null_buffer().append_n_non_nulls(that_len);
68 }
69
70 let that_fields = that.struct_type().fields();
71 let mut that_builders = that_fields
72 .iter()
73 .zip(that.mut_value_builders().iter_mut())
74 .map(|(field, builder)| (field.name(), builder))
75 .collect::<HashMap<_, _>>();
76
77 for (field, this_builder) in this
78 .struct_type()
79 .fields()
80 .iter()
81 .zip(this.mut_value_builders().iter_mut())
82 {
83 if let Some(that_builder) = that_builders.get_mut(field.name()) {
84 if field.data_type().is_struct() {
85 let this = this_builder
86 .as_mut_any()
87 .downcast_mut::<StructVectorBuilder>()
88 .unwrap();
90
91 let that = that_builder
92 .as_mut_any()
93 .downcast_mut::<StructVectorBuilder>()
94 .unwrap();
97 helper(this, that)?;
98 } else {
99 let vector = that_builder.to_vector();
100 this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
101 }
102 } else {
103 this_builder.push_nulls(that_len);
104 }
105 }
106 Ok(())
107 }
108 helper(&mut self.inner, &mut other.inner)
109 }
110
111 fn try_merge_cloned(&mut self, other: &JsonStructsBuilder) -> Result<()> {
113 debug_assert!(self.json_type.is_mergeable(&other.json_type));
114
115 fn helper(this: &mut StructVectorBuilder, that: &StructVectorBuilder) -> Result<()> {
116 let that_len = that.len();
117 if let Some(x) = that.null_buffer().finish_cloned() {
118 this.mut_null_buffer().append_buffer(&x)
119 } else {
120 this.mut_null_buffer().append_n_non_nulls(that_len);
121 }
122
123 let that_fields = that.struct_type().fields();
124 let that_builders = that_fields
125 .iter()
126 .zip(that.value_builders().iter())
127 .map(|(field, builder)| (field.name(), builder))
128 .collect::<HashMap<_, _>>();
129
130 for (field, this_builder) in this
131 .struct_type()
132 .fields()
133 .iter()
134 .zip(this.mut_value_builders().iter_mut())
135 {
136 if let Some(that_builder) = that_builders.get(field.name()) {
137 if field.data_type().is_struct() {
138 let this = this_builder
139 .as_mut_any()
140 .downcast_mut::<StructVectorBuilder>()
141 .unwrap();
143
144 let that = that_builder
145 .as_any()
146 .downcast_ref::<StructVectorBuilder>()
147 .unwrap();
150 helper(this, that)?;
151 } else {
152 let vector = that_builder.to_vector_cloned();
153 this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
154 }
155 } else {
156 this_builder.push_nulls(that_len);
157 }
158 }
159 Ok(())
160 }
161 helper(&mut self.inner, &other.inner)
162 }
163}
164
165pub(crate) struct JsonVectorBuilder {
178 merged_type: JsonType,
179 capacity: usize,
180 builders: Vec<JsonStructsBuilder>,
181}
182
183impl JsonVectorBuilder {
184 pub(crate) fn with_capacity(capacity: usize) -> Self {
185 Self {
186 merged_type: JsonType::empty(),
187 capacity,
188 builders: vec![],
189 }
190 }
191
192 fn try_create_new_builder(&mut self, json_type: &JsonType) -> Result<&mut JsonStructsBuilder> {
193 self.merged_type.merge(json_type)?;
194
195 let builder = JsonStructsBuilder::new(json_type.clone(), self.capacity);
196 self.builders.push(builder);
197
198 let len = self.builders.len();
199 Ok(&mut self.builders[len - 1])
200 }
201}
202
203impl MutableVector for JsonVectorBuilder {
204 fn data_type(&self) -> ConcreteDataType {
205 ConcreteDataType::Json(self.merged_type.clone())
206 }
207
208 fn len(&self) -> usize {
209 self.builders.iter().map(|x| x.len()).sum()
210 }
211
212 fn as_any(&self) -> &dyn Any {
213 self
214 }
215
216 fn as_mut_any(&mut self) -> &mut dyn Any {
217 self
218 }
219
220 fn to_vector(&mut self) -> VectorRef {
221 if self.builders.len() == 1 {
223 return self.builders[0].inner.to_vector();
224 }
225
226 let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
227 for builder in self.builders.iter_mut() {
228 unified_jsons
229 .try_merge(builder)
230 .unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
233 }
234 unified_jsons.inner.to_vector()
235 }
236
237 fn to_vector_cloned(&self) -> VectorRef {
238 if self.builders.len() == 1 {
240 return self.builders[0].inner.to_vector_cloned();
241 }
242
243 let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
244 for builder in self.builders.iter() {
245 unified_jsons
246 .try_merge_cloned(builder)
247 .unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
250 }
251 unified_jsons.inner.to_vector_cloned()
252 }
253
254 fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
255 let data_type = value.data_type();
256 let json_type = data_type.as_json().with_context(|| TryFromValueSnafu {
257 reason: format!("expected json value, got {value:?}"),
258 })?;
259
260 let builder = match self.builders.last_mut() {
261 Some(last) => {
262 if &last.json_type != json_type {
263 self.try_create_new_builder(json_type)?
264 } else {
265 last
266 }
267 }
268 None => self.try_create_new_builder(json_type)?,
269 };
270
271 let ValueRef::Json(value) = value else {
272 unreachable!()
274 };
275 builder.push(value)
276 }
277
278 fn push_null(&mut self) {
279 let null_json_value = ValueRef::Json(Box::new(ValueRef::Null));
280 self.try_push_value_ref(&null_json_value)
281 .unwrap_or_else(|e| {
284 panic!("failed to push null json value: {null_json_value:?}, error: {e}")
285 });
286 }
287
288 fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> {
289 UnsupportedOperationSnafu {
290 op: "extend_slice_of",
291 vector_type: "JsonVector",
292 }
293 .fail()
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::data_type::DataType;
301 use crate::json::JsonStructureSettings;
302 use crate::vectors::helper::pretty_print;
303
304 fn push(json: &str, builder: &mut JsonVectorBuilder, expected: std::result::Result<(), &str>) {
305 let settings = JsonStructureSettings::Structured(None);
306 let json: serde_json::Value = serde_json::from_str(json).unwrap();
307 let value = settings.encode(json).unwrap();
308
309 let value = value.as_value_ref();
310 let result = builder.try_push_value_ref(&value);
311 match (result, expected) {
312 (Ok(()), Ok(())) => (),
313 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
314 _ => unreachable!(),
315 }
316 }
317
318 #[test]
319 fn test_push_plain_jsons() -> Result<()> {
320 let jsons = vec!["1", "2", r#""s""#, "[true]"];
321 let results = vec![
322 Ok(()),
323 Ok(()),
324 Err(
325 "Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: String",
326 ),
327 Err(
328 "Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: List<Boolean>",
329 ),
330 ];
331 let mut builder = JsonVectorBuilder::with_capacity(1);
332 for (json, result) in jsons.into_iter().zip(results.into_iter()) {
333 push(json, &mut builder, result);
334 }
335 let vector = builder.to_vector();
336 let expected = r#"
337+----------------+
338| StructVector |
339+----------------+
340| {__plain__: 1} |
341| {__plain__: 2} |
342+----------------+"#;
343 assert_eq!(pretty_print(vector), expected.trim());
344 Ok(())
345 }
346
347 #[test]
348 fn test_push_json_objects() -> Result<()> {
349 let jsons = vec![
350 r#"{
351 "s": "a",
352 "list": [1, 2, 3]
353 }"#,
354 r#"{
355 "list": [4],
356 "s": "b"
357 }"#,
358 r#"{
359 "s": "c",
360 "float": 0.9
361 }"#,
362 r#"{
363 "float": 0.8,
364 "s": "d"
365 }"#,
366 r#"{
367 "float": 0.7,
368 "int": -1
369 }"#,
370 r#"{
371 "int": 0,
372 "float": 0.6
373 }"#,
374 r#"{
375 "int": 1,
376 "object": {"hello": "world", "timestamp": 1761523200000}
377 }"#,
378 r#"{
379 "object": {"hello": "greptime", "timestamp": 1761523201000},
380 "int": 2
381 }"#,
382 r#"{
383 "object": {"timestamp": 1761523202000},
384 "nested": {"a": {"b": {"b": {"a": "abba"}}}}
385 }"#,
386 r#"{
387 "nested": {"a": {"b": {"a": {"b": "abab"}}}},
388 "object": {"timestamp": 1761523203000}
389 }"#,
390 ];
391 let mut builder = JsonVectorBuilder::with_capacity(1);
392 for json in jsons {
393 push(json, &mut builder, Ok(()));
394 }
395 assert_eq!(builder.len(), 10);
396
397 assert_eq!(builder.builders.len(), 6);
399 let expect_types = [
400 r#"Json<Struct<"list": List<Int64>, "s": String>>"#,
401 r#"Json<Struct<"float": Float64, "s": String>>"#,
402 r#"Json<Struct<"float": Float64, "int": Int64>>"#,
403 r#"Json<Struct<"int": Int64, "object": Struct<"hello": String, "timestamp": Int64>>>"#,
404 r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"b": Struct<"a": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
405 r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
406 ];
407 let expect_vectors = [
408 r#"
409+-------------------------+
410| StructVector |
411+-------------------------+
412| {list: [1, 2, 3], s: a} |
413| {list: [4], s: b} |
414+-------------------------+"#,
415 r#"
416+--------------------+
417| StructVector |
418+--------------------+
419| {float: 0.9, s: c} |
420| {float: 0.8, s: d} |
421+--------------------+"#,
422 r#"
423+-----------------------+
424| StructVector |
425+-----------------------+
426| {float: 0.7, int: -1} |
427| {float: 0.6, int: 0} |
428+-----------------------+"#,
429 r#"
430+---------------------------------------------------------------+
431| StructVector |
432+---------------------------------------------------------------+
433| {int: 1, object: {hello: world, timestamp: 1761523200000}} |
434| {int: 2, object: {hello: greptime, timestamp: 1761523201000}} |
435+---------------------------------------------------------------+"#,
436 r#"
437+------------------------------------------------------------------------+
438| StructVector |
439+------------------------------------------------------------------------+
440| {nested: {a: {b: {b: {a: abba}}}}, object: {timestamp: 1761523202000}} |
441+------------------------------------------------------------------------+"#,
442 r#"
443+------------------------------------------------------------------------+
444| StructVector |
445+------------------------------------------------------------------------+
446| {nested: {a: {b: {a: {b: abab}}}}, object: {timestamp: 1761523203000}} |
447+------------------------------------------------------------------------+"#,
448 ];
449 for (builder, (expect_type, expect_vector)) in builder
450 .builders
451 .iter()
452 .zip(expect_types.into_iter().zip(expect_vectors.into_iter()))
453 {
454 assert_eq!(builder.json_type.name(), expect_type);
455 let vector = builder.inner.to_vector_cloned();
456 assert_eq!(pretty_print(vector), expect_vector.trim());
457 }
458
459 let expected = r#"Json<Struct<"float": Float64, "int": Int64, "list": List<Int64>, "nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>, "b": Struct<"a": String>>>>, "object": Struct<"hello": String, "timestamp": Int64>, "s": String>>"#;
461 assert_eq!(builder.data_type().to_string(), expected);
462
463 let expected = r#"
465+-------------------------------------------------------------------------------------------------------------------+
466| StructVector |
467+-------------------------------------------------------------------------------------------------------------------+
468| {float: , int: , list: [1, 2, 3], nested: , object: , s: a} |
469| {float: , int: , list: [4], nested: , object: , s: b} |
470| {float: 0.9, int: , list: , nested: , object: , s: c} |
471| {float: 0.8, int: , list: , nested: , object: , s: d} |
472| {float: 0.7, int: -1, list: , nested: , object: , s: } |
473| {float: 0.6, int: 0, list: , nested: , object: , s: } |
474| {float: , int: 1, list: , nested: , object: {hello: world, timestamp: 1761523200000}, s: } |
475| {float: , int: 2, list: , nested: , object: {hello: greptime, timestamp: 1761523201000}, s: } |
476| {float: , int: , list: , nested: {a: {b: {a: , b: {a: abba}}}}, object: {hello: , timestamp: 1761523202000}, s: } |
477| {float: , int: , list: , nested: {a: {b: {a: {b: abab}, b: }}}, object: {hello: , timestamp: 1761523203000}, s: } |
478+-------------------------------------------------------------------------------------------------------------------+"#;
479 let vector = builder.to_vector_cloned();
480 assert_eq!(pretty_print(vector), expected.trim());
481 let vector = builder.to_vector();
482 assert_eq!(pretty_print(vector), expected.trim());
483 Ok(())
484 }
485}