1pub mod coerce;
16
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
27use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
28use itertools::Itertools;
29use once_cell::sync::OnceCell;
30use serde_json::Number;
31use session::context::Channel;
32
33use crate::error::{
34 IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
35 TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
36 TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
37};
38use crate::etl::ctx_req::ContextOpt;
39use crate::etl::field::{Field, Fields};
40use crate::etl::transform::index::Index;
41use crate::etl::transform::{Transform, Transforms};
42use crate::etl::value::{Timestamp, Value};
43use crate::etl::PipelineMap;
44use crate::PipelineContext;
45
46const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
47const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
48
49#[derive(Debug, Clone)]
52pub struct GreptimeTransformer {
53 transforms: Transforms,
54 schema: Vec<ColumnSchema>,
55}
56
57#[derive(Debug, Clone, Default)]
59pub struct GreptimePipelineParams {
60 options: HashMap<String, String>,
63
64 pub flatten_json_object: OnceCell<bool>,
66}
67
68impl GreptimePipelineParams {
69 pub fn from_params(params: Option<&str>) -> Self {
73 let options = params
74 .unwrap_or_default()
75 .split('&')
76 .filter_map(|s| s.split_once('='))
77 .map(|(k, v)| (k.to_string(), v.to_string()))
78 .collect::<HashMap<String, String>>();
79
80 Self {
81 options,
82 flatten_json_object: OnceCell::new(),
83 }
84 }
85
86 pub fn flatten_json_object(&self) -> bool {
88 *self.flatten_json_object.get_or_init(|| {
89 self.options
90 .get("flatten_json_object")
91 .map(|v| v == "true")
92 .unwrap_or(false)
93 })
94 }
95}
96
97impl GreptimeTransformer {
98 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
100 let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
101 let default = None;
102
103 let transform = Transform {
104 fields: Fields::one(Field::new(
105 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
106 None,
107 )),
108 type_,
109 default,
110 index: Some(Index::Time),
111 on_failure: Some(crate::etl::transform::OnFailure::Default),
112 tag: false,
113 };
114 transforms.push(transform);
115 }
116
117 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
119 let mut schema = vec![];
120 for transform in transforms.iter() {
121 schema.extend(coerce_columns(transform)?);
122 }
123 Ok(schema)
124 }
125}
126
127impl GreptimeTransformer {
128 pub fn new(mut transforms: Transforms) -> Result<Self> {
129 let mut column_names_set = HashSet::new();
131 let mut timestamp_columns = vec![];
132
133 for transform in transforms.iter() {
134 let target_fields_set = transform
135 .fields
136 .iter()
137 .map(|f| f.target_or_input_field())
138 .collect::<HashSet<_>>();
139
140 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
141 if !intersections.is_empty() {
142 let duplicates = intersections.iter().join(",");
143 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
144 }
145
146 column_names_set.extend(target_fields_set);
147
148 if let Some(idx) = transform.index {
149 if idx == Index::Time {
150 match transform.fields.len() {
151 1 => {
153 timestamp_columns.push(transform.fields.first().unwrap().input_field())
154 }
155 _ => {
156 return TransformMultipleTimestampIndexSnafu {
157 columns: transform
158 .fields
159 .iter()
160 .map(|x| x.input_field())
161 .join(", "),
162 }
163 .fail();
164 }
165 }
166 }
167 }
168 }
169
170 match timestamp_columns.len() {
171 0 => {
172 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
173
174 let schema = GreptimeTransformer::init_schemas(&transforms)?;
175 Ok(GreptimeTransformer { transforms, schema })
176 }
177 1 => {
178 let schema = GreptimeTransformer::init_schemas(&transforms)?;
179 Ok(GreptimeTransformer { transforms, schema })
180 }
181 _ => {
182 let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
183 let count = timestamp_columns.len();
184 TransformTimestampIndexCountSnafu { count, columns }.fail()
185 }
186 }
187 }
188
189 pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
190 let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
191
192 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
193 let mut output_index = 0;
194 for transform in self.transforms.iter() {
195 for field in transform.fields.iter() {
196 let index = field.input_field();
197 match pipeline_map.get(index) {
198 Some(v) => {
199 let value_data = coerce_value(v, transform)?;
200 values[output_index] = GreptimeValue { value_data };
202 }
203 None => {
204 let value_data = match transform.on_failure {
205 Some(crate::etl::transform::OnFailure::Default) => {
206 match transform.get_default() {
207 Some(default) => coerce_value(default, transform)?,
208 None => match transform.get_default_value_when_data_is_none() {
209 Some(default) => coerce_value(&default, transform)?,
210 None => None,
211 },
212 }
213 }
214 Some(crate::etl::transform::OnFailure::Ignore) => None,
215 None => None,
216 };
217 values[output_index] = GreptimeValue { value_data };
218 }
219 }
220 output_index += 1;
221 }
222 }
223 Ok((opt, Row { values }))
224 }
225
226 pub fn transforms(&self) -> &Transforms {
227 &self.transforms
228 }
229
230 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
231 &self.schema
232 }
233
234 pub fn transforms_mut(&mut self) -> &mut Transforms {
235 &mut self.transforms
236 }
237}
238
239#[derive(Debug, Default)]
243pub struct SchemaInfo {
244 pub schema: Vec<ColumnSchema>,
246 pub index: HashMap<String, usize>,
248}
249
250impl SchemaInfo {
251 pub fn with_capacity(capacity: usize) -> Self {
252 Self {
253 schema: Vec::with_capacity(capacity),
254 index: HashMap::with_capacity(capacity),
255 }
256 }
257}
258
259fn resolve_schema(
260 index: Option<usize>,
261 value_data: ValueData,
262 column_schema: ColumnSchema,
263 row: &mut Vec<GreptimeValue>,
264 schema_info: &mut SchemaInfo,
265) -> Result<()> {
266 if let Some(index) = index {
267 let api_value = GreptimeValue {
268 value_data: Some(value_data),
269 };
270 let value_column_data_type = proto_value_type(&api_value).unwrap();
272 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
274 if value_column_data_type != schema_column_data_type {
275 IdentifyPipelineColumnTypeMismatchSnafu {
276 column: column_schema.column_name,
277 expected: schema_column_data_type.as_str_name(),
278 actual: value_column_data_type.as_str_name(),
279 }
280 .fail()
281 } else {
282 row[index] = api_value;
283 Ok(())
284 }
285 } else {
286 let key = column_schema.column_name.clone();
287 schema_info.schema.push(column_schema);
288 schema_info.index.insert(key, schema_info.schema.len() - 1);
289 let api_value = GreptimeValue {
290 value_data: Some(value_data),
291 };
292 row.push(api_value);
293 Ok(())
294 }
295}
296
297fn resolve_number_schema(
298 n: Number,
299 column_name: String,
300 index: Option<usize>,
301 row: &mut Vec<GreptimeValue>,
302 schema_info: &mut SchemaInfo,
303) -> Result<()> {
304 let (value, datatype, semantic_type) = if n.is_i64() {
305 (
306 ValueData::I64Value(n.as_i64().unwrap()),
307 ColumnDataType::Int64 as i32,
308 SemanticType::Field as i32,
309 )
310 } else if n.is_u64() {
311 (
312 ValueData::U64Value(n.as_u64().unwrap()),
313 ColumnDataType::Uint64 as i32,
314 SemanticType::Field as i32,
315 )
316 } else if n.is_f64() {
317 (
318 ValueData::F64Value(n.as_f64().unwrap()),
319 ColumnDataType::Float64 as i32,
320 SemanticType::Field as i32,
321 )
322 } else {
323 return UnsupportedNumberTypeSnafu { value: n }.fail();
324 };
325 resolve_schema(
326 index,
327 value,
328 ColumnSchema {
329 column_name,
330 datatype,
331 semantic_type,
332 datatype_extension: None,
333 options: None,
334 },
335 row,
336 schema_info,
337 )
338}
339
340fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<ValueData>> {
341 match p_ctx.channel {
342 Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
343 values
344 .get(GREPTIME_TIMESTAMP)
345 .and_then(|v| v.as_i64())
346 .unwrap_or_default(),
347 ))),
348 _ => {
349 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
350 match custom_ts {
351 Some(ts) => {
352 let ts_field = values.get(ts.get_column_name());
353 Some(ts.get_timestamp(ts_field)).transpose()
354 }
355 None => Ok(Some(ValueData::TimestampNanosecondValue(
356 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
357 ))),
358 }
359 }
360 }
361}
362
363fn values_to_row(
364 schema_info: &mut SchemaInfo,
365 values: PipelineMap,
366 pipeline_ctx: &PipelineContext<'_>,
367) -> Result<Row> {
368 let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
369 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
370
371 let ts = calc_ts(pipeline_ctx, &values)?;
373
374 row.push(GreptimeValue { value_data: ts });
375
376 for _ in 1..schema_info.schema.len() {
377 row.push(GreptimeValue { value_data: None });
378 }
379
380 let ts_column_name = custom_ts
382 .as_ref()
383 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
384
385 for (column_name, value) in values {
386 if column_name == ts_column_name {
387 continue;
388 }
389
390 resolve_value(value, column_name, &mut row, schema_info, pipeline_ctx)?;
391 }
392 Ok(Row { values: row })
393}
394
395fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
396 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
397 SemanticType::Tag as i32
398 } else {
399 SemanticType::Field as i32
400 }
401}
402
403fn resolve_value(
404 value: Value,
405 column_name: String,
406 row: &mut Vec<GreptimeValue>,
407 schema_info: &mut SchemaInfo,
408 p_ctx: &PipelineContext,
409) -> Result<()> {
410 let index = schema_info.index.get(&column_name).copied();
411 let mut resolve_simple_type =
412 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
413 let semantic_type = decide_semantic(p_ctx, &column_name);
414 resolve_schema(
415 index,
416 value_data,
417 ColumnSchema {
418 column_name,
419 datatype: data_type as i32,
420 semantic_type,
421 datatype_extension: None,
422 options: None,
423 },
424 row,
425 schema_info,
426 )
427 };
428
429 match value {
430 Value::Null => {}
431
432 Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
433 let v = value.as_i64().unwrap();
435 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
436 }
437
438 Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
439 let v = value.as_u64().unwrap();
441 resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
442 }
443
444 Value::Float32(_) | Value::Float64(_) => {
445 let v = value.as_f64().unwrap();
447 resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
448 }
449
450 Value::Boolean(v) => {
451 resolve_simple_type(
452 ValueData::BoolValue(v),
453 column_name,
454 ColumnDataType::Boolean,
455 )?;
456 }
457
458 Value::String(v) => {
459 resolve_simple_type(
460 ValueData::StringValue(v),
461 column_name,
462 ColumnDataType::String,
463 )?;
464 }
465
466 Value::Timestamp(Timestamp::Nanosecond(ns)) => {
467 resolve_simple_type(
468 ValueData::TimestampNanosecondValue(ns),
469 column_name,
470 ColumnDataType::TimestampNanosecond,
471 )?;
472 }
473
474 Value::Timestamp(Timestamp::Microsecond(us)) => {
475 resolve_simple_type(
476 ValueData::TimestampMicrosecondValue(us),
477 column_name,
478 ColumnDataType::TimestampMicrosecond,
479 )?;
480 }
481
482 Value::Timestamp(Timestamp::Millisecond(ms)) => {
483 resolve_simple_type(
484 ValueData::TimestampMillisecondValue(ms),
485 column_name,
486 ColumnDataType::TimestampMillisecond,
487 )?;
488 }
489
490 Value::Timestamp(Timestamp::Second(s)) => {
491 resolve_simple_type(
492 ValueData::TimestampSecondValue(s),
493 column_name,
494 ColumnDataType::TimestampSecond,
495 )?;
496 }
497
498 Value::Array(_) | Value::Map(_) => {
499 let data: jsonb::Value = value.into();
500 resolve_schema(
501 index,
502 ValueData::BinaryValue(data.to_vec()),
503 ColumnSchema {
504 column_name,
505 datatype: ColumnDataType::Binary as i32,
506 semantic_type: SemanticType::Field as i32,
507 datatype_extension: Some(ColumnDataTypeExtension {
508 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
509 }),
510 options: None,
511 },
512 row,
513 schema_info,
514 )?;
515 }
516 }
517 Ok(())
518}
519
520fn identity_pipeline_inner(
521 pipeline_maps: Vec<PipelineMap>,
522 pipeline_ctx: &PipelineContext<'_>,
523) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
524 let mut schema_info = SchemaInfo::default();
525 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
526
527 schema_info.schema.push(ColumnSchema {
529 column_name: custom_ts
530 .map(|ts| ts.get_column_name().clone())
531 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
532 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
533 if pipeline_ctx.channel == Channel::Prometheus {
534 ColumnDataType::TimestampMillisecond
535 } else {
536 ColumnDataType::TimestampNanosecond
537 }
538 }) as i32,
539 semantic_type: SemanticType::Timestamp as i32,
540 datatype_extension: None,
541 options: None,
542 });
543
544 let mut opt_map = HashMap::new();
545 let len = pipeline_maps.len();
546
547 for mut pipeline_map in pipeline_maps {
548 let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
549 let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
550
551 opt_map
552 .entry(opt)
553 .or_insert_with(|| Vec::with_capacity(len))
554 .push(row);
555 }
556
557 let column_count = schema_info.schema.len();
558 for (_, row) in opt_map.iter_mut() {
559 for row in row.iter_mut() {
560 let diff = column_count - row.values.len();
561 for _ in 0..diff {
562 row.values.push(GreptimeValue { value_data: None });
563 }
564 }
565 }
566
567 Ok((schema_info, opt_map))
568}
569
570pub fn identity_pipeline(
579 array: Vec<PipelineMap>,
580 table: Option<Arc<table::Table>>,
581 pipeline_ctx: &PipelineContext<'_>,
582) -> Result<HashMap<ContextOpt, Rows>> {
583 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
584 array
585 .into_iter()
586 .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
587 .collect::<Result<Vec<PipelineMap>>>()?
588 } else {
589 array
590 };
591
592 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
593 if let Some(table) = table {
594 let table_info = table.table_info();
595 for tag_name in table_info.meta.row_key_column_names() {
596 if let Some(index) = schema.index.get(tag_name) {
597 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
598 }
599 }
600 }
601
602 opt_map
603 .into_iter()
604 .map(|(opt, rows)| {
605 (
606 opt,
607 Rows {
608 schema: schema.schema.clone(),
609 rows,
610 },
611 )
612 })
613 .collect::<HashMap<ContextOpt, Rows>>()
614 })
615}
616
617pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
622 let mut flattened = PipelineMap::new();
623
624 if !object.is_empty() {
625 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
627 }
628
629 Ok(flattened)
630}
631
632fn do_flatten_object(
633 dest: &mut PipelineMap,
634 base: Option<&str>,
635 object: PipelineMap,
636 current_level: usize,
637 max_nested_levels: usize,
638) -> Result<()> {
639 if current_level > max_nested_levels {
641 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
642 }
643
644 for (key, value) in object {
645 let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
646
647 match value {
648 Value::Map(object) => {
649 do_flatten_object(
650 dest,
651 Some(&new_key),
652 object.values,
653 current_level + 1,
654 max_nested_levels,
655 )?;
656 }
657 _ => {
659 dest.insert(new_key, value);
660 }
661 }
662 }
663
664 Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669 use api::v1::SemanticType;
670
671 use super::*;
672 use crate::etl::{json_array_to_map, json_to_map};
673 use crate::{identity_pipeline, PipelineDefinition};
674
675 #[test]
676 fn test_identify_pipeline() {
677 let params = GreptimePipelineParams::default();
678 let pipeline_ctx = PipelineContext::new(
679 &PipelineDefinition::GreptimeIdentityPipeline(None),
680 ¶ms,
681 Channel::Unknown,
682 );
683 {
684 let array = vec![
685 serde_json::json!({
686 "woshinull": null,
687 "name": "Alice",
688 "age": 20,
689 "is_student": true,
690 "score": 99.5,
691 "hobbies": "reading",
692 "address": "Beijing",
693 }),
694 serde_json::json!({
695 "name": "Bob",
696 "age": 21,
697 "is_student": false,
698 "score": "88.5",
699 "hobbies": "swimming",
700 "address": "Shanghai",
701 "gaga": "gaga"
702 }),
703 ];
704 let array = json_array_to_map(array).unwrap();
705 let rows = identity_pipeline(array, None, &pipeline_ctx);
706 assert!(rows.is_err());
707 assert_eq!(
708 rows.err().unwrap().to_string(),
709 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
710 );
711 }
712 {
713 let array = vec![
714 serde_json::json!({
715 "woshinull": null,
716 "name": "Alice",
717 "age": 20,
718 "is_student": true,
719 "score": 99.5,
720 "hobbies": "reading",
721 "address": "Beijing",
722 }),
723 serde_json::json!({
724 "name": "Bob",
725 "age": 21,
726 "is_student": false,
727 "score": 88,
728 "hobbies": "swimming",
729 "address": "Shanghai",
730 "gaga": "gaga"
731 }),
732 ];
733 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
734 assert!(rows.is_err());
735 assert_eq!(
736 rows.err().unwrap().to_string(),
737 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
738 );
739 }
740 {
741 let array = vec![
742 serde_json::json!({
743 "woshinull": null,
744 "name": "Alice",
745 "age": 20,
746 "is_student": true,
747 "score": 99.5,
748 "hobbies": "reading",
749 "address": "Beijing",
750 }),
751 serde_json::json!({
752 "name": "Bob",
753 "age": 21,
754 "is_student": false,
755 "score": 88.5,
756 "hobbies": "swimming",
757 "address": "Shanghai",
758 "gaga": "gaga"
759 }),
760 ];
761 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
762 assert!(rows.is_ok());
763 let mut rows = rows.unwrap();
764 assert!(rows.len() == 1);
765 let rows = rows.remove(&ContextOpt::default()).unwrap();
766 assert_eq!(rows.schema.len(), 8);
767 assert_eq!(rows.rows.len(), 2);
768 assert_eq!(8, rows.rows[0].values.len());
769 assert_eq!(8, rows.rows[1].values.len());
770 }
771 {
772 let array = vec![
773 serde_json::json!({
774 "woshinull": null,
775 "name": "Alice",
776 "age": 20,
777 "is_student": true,
778 "score": 99.5,
779 "hobbies": "reading",
780 "address": "Beijing",
781 }),
782 serde_json::json!({
783 "name": "Bob",
784 "age": 21,
785 "is_student": false,
786 "score": 88.5,
787 "hobbies": "swimming",
788 "address": "Shanghai",
789 "gaga": "gaga"
790 }),
791 ];
792 let tag_column_names = ["name".to_string(), "address".to_string()];
793
794 let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
795 .map(|(mut schema, mut rows)| {
796 for name in tag_column_names {
797 if let Some(index) = schema.index.get(&name) {
798 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
799 }
800 }
801
802 assert!(rows.len() == 1);
803 let rows = rows.remove(&ContextOpt::default()).unwrap();
804
805 Rows {
806 schema: schema.schema,
807 rows,
808 }
809 });
810
811 assert!(rows.is_ok());
812 let rows = rows.unwrap();
813 assert_eq!(rows.schema.len(), 8);
814 assert_eq!(rows.rows.len(), 2);
815 assert_eq!(8, rows.rows[0].values.len());
816 assert_eq!(8, rows.rows[1].values.len());
817 assert_eq!(
818 rows.schema
819 .iter()
820 .find(|x| x.column_name == "name")
821 .unwrap()
822 .semantic_type,
823 SemanticType::Tag as i32
824 );
825 assert_eq!(
826 rows.schema
827 .iter()
828 .find(|x| x.column_name == "address")
829 .unwrap()
830 .semantic_type,
831 SemanticType::Tag as i32
832 );
833 assert_eq!(
834 rows.schema
835 .iter()
836 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
837 .count(),
838 2
839 );
840 }
841 }
842
843 #[test]
844 fn test_flatten() {
845 let test_cases = vec![
846 (
848 serde_json::json!(
849 {
850 "a": {
851 "b": {
852 "c": [1, 2, 3]
853 }
854 },
855 "d": [
856 "foo",
857 "bar"
858 ],
859 "e": {
860 "f": [7, 8, 9],
861 "g": {
862 "h": 123,
863 "i": "hello",
864 "j": {
865 "k": true
866 }
867 }
868 }
869 }
870 ),
871 10,
872 Some(serde_json::json!(
873 {
874 "a.b.c": [1,2,3],
875 "d": ["foo","bar"],
876 "e.f": [7,8,9],
877 "e.g.h": 123,
878 "e.g.i": "hello",
879 "e.g.j.k": true
880 }
881 )),
882 ),
883 (
885 serde_json::json!(
886 {
887 "a": {
888 "b": {
889 "c": {
890 "d": [1, 2, 3]
891 }
892 }
893 },
894 "e": [
895 "foo",
896 "bar"
897 ]
898 }
899 ),
900 3,
901 None,
902 ),
903 ];
904
905 for (input, max_depth, expected) in test_cases {
906 let input = json_to_map(input).unwrap();
907 let expected = expected.map(|e| json_to_map(e).unwrap());
908
909 let flattened_object = flatten_object(input, max_depth).ok();
910 assert_eq!(flattened_object, expected);
911 }
912 }
913
914 #[test]
915 fn test_greptime_pipeline_params() {
916 let params = Some("flatten_json_object=true");
917 let pipeline_params = GreptimePipelineParams::from_params(params);
918 assert!(pipeline_params.flatten_json_object());
919
920 let params = None;
921 let pipeline_params = GreptimePipelineParams::from_params(params);
922 assert!(!pipeline_params.flatten_json_object());
923 }
924}