1pub mod coerce;
16
17use std::collections::{BTreeMap, HashSet};
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
27use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
28use itertools::Itertools;
29use once_cell::sync::OnceCell;
30use serde_json::Number;
31use session::context::Channel;
32use snafu::OptionExt;
33
34use crate::error::{
35 IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
36 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
37 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
38 UnsupportedNumberTypeSnafu, ValueMustBeMapSnafu,
39};
40use crate::etl::ctx_req::ContextOpt;
41use crate::etl::field::{Field, Fields};
42use crate::etl::transform::index::Index;
43use crate::etl::transform::{Transform, Transforms};
44use crate::etl::value::{Timestamp, Value};
45use crate::etl::PipelineDocVersion;
46use crate::{unwrap_or_continue_if_err, Map, PipelineContext};
47
48const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
49const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
50
51#[derive(Debug, Clone)]
54pub struct GreptimeTransformer {
55 transforms: Transforms,
56 schema: Vec<ColumnSchema>,
57}
58
59fn truthy<V: AsRef<str>>(v: V) -> bool {
60 let v = v.as_ref().to_lowercase();
61 v == "true" || v == "1" || v == "yes" || v == "on" || v == "t"
62}
63
64#[derive(Debug, Default)]
66pub struct GreptimePipelineParams {
67 options: HashMap<String, String>,
70
71 pub flatten_json_object: OnceCell<bool>,
73 pub skip_error: OnceCell<bool>,
75}
76
77impl GreptimePipelineParams {
78 pub fn from_params(params: Option<&str>) -> Self {
82 let options = Self::parse_header_str_to_map(params);
83
84 Self {
85 options,
86 skip_error: OnceCell::new(),
87 flatten_json_object: OnceCell::new(),
88 }
89 }
90
91 pub fn from_map(options: HashMap<String, String>) -> Self {
92 Self {
93 options,
94 skip_error: OnceCell::new(),
95 flatten_json_object: OnceCell::new(),
96 }
97 }
98
99 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
100 if let Some(params) = params {
101 if params.is_empty() {
102 HashMap::new()
103 } else {
104 params
105 .split('&')
106 .filter_map(|s| s.split_once('='))
107 .map(|(k, v)| (k.to_string(), v.to_string()))
108 .collect::<HashMap<String, String>>()
109 }
110 } else {
111 HashMap::new()
112 }
113 }
114
115 pub fn flatten_json_object(&self) -> bool {
117 *self.flatten_json_object.get_or_init(|| {
118 self.options
119 .get("flatten_json_object")
120 .map(|v| v == "true")
121 .unwrap_or(false)
122 })
123 }
124
125 pub fn skip_error(&self) -> bool {
127 *self
128 .skip_error
129 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
130 }
131}
132
133impl GreptimeTransformer {
134 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
136 let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
137 let default = None;
138
139 let transform = Transform {
140 fields: Fields::one(Field::new(
141 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
142 None,
143 )),
144 type_,
145 default,
146 index: Some(Index::Time),
147 on_failure: Some(crate::etl::transform::OnFailure::Default),
148 tag: false,
149 };
150 transforms.push(transform);
151 }
152
153 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
155 let mut schema = vec![];
156 for transform in transforms.iter() {
157 schema.extend(coerce_columns(transform)?);
158 }
159 Ok(schema)
160 }
161}
162
163impl GreptimeTransformer {
164 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
165 let mut column_names_set = HashSet::new();
167 let mut timestamp_columns = vec![];
168
169 for transform in transforms.iter() {
170 let target_fields_set = transform
171 .fields
172 .iter()
173 .map(|f| f.target_or_input_field())
174 .collect::<HashSet<_>>();
175
176 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
177 if !intersections.is_empty() {
178 let duplicates = intersections.iter().join(",");
179 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
180 }
181
182 column_names_set.extend(target_fields_set);
183
184 if let Some(idx) = transform.index {
185 if idx == Index::Time {
186 match transform.fields.len() {
187 1 => {
189 timestamp_columns.push(transform.fields.first().unwrap().input_field())
190 }
191 _ => {
192 return TransformMultipleTimestampIndexSnafu {
193 columns: transform
194 .fields
195 .iter()
196 .map(|x| x.input_field())
197 .join(", "),
198 }
199 .fail();
200 }
201 }
202 }
203 }
204 }
205
206 let schema = match timestamp_columns.len() {
207 0 if doc_version == &PipelineDocVersion::V1 => {
208 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
210 GreptimeTransformer::init_schemas(&transforms)?
211 }
212 1 => GreptimeTransformer::init_schemas(&transforms)?,
213 count => {
214 let columns = timestamp_columns.iter().join(", ");
215 return TransformTimestampIndexCountSnafu { count, columns }.fail();
216 }
217 };
218 Ok(GreptimeTransformer { transforms, schema })
219 }
220
221 pub fn transform_mut(
222 &self,
223 pipeline_map: &mut Value,
224 is_v1: bool,
225 ) -> Result<Vec<GreptimeValue>> {
226 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
227 let mut output_index = 0;
228 for transform in self.transforms.iter() {
229 for field in transform.fields.iter() {
230 let column_name = field.input_field();
231
232 match pipeline_map.get(column_name) {
234 Some(v) => {
235 let value_data = coerce_value(v, transform)?;
236 values[output_index] = GreptimeValue { value_data };
238 }
239 None => {
240 let value_data = match transform.on_failure {
241 Some(crate::etl::transform::OnFailure::Default) => {
242 match transform.get_default() {
243 Some(default) => coerce_value(default, transform)?,
244 None => match transform.get_default_value_when_data_is_none() {
245 Some(default) => coerce_value(&default, transform)?,
246 None => None,
247 },
248 }
249 }
250 Some(crate::etl::transform::OnFailure::Ignore) => None,
251 None => None,
252 };
253 if transform.is_timeindex() && value_data.is_none() {
254 return TimeIndexMustBeNonNullSnafu.fail();
255 }
256 values[output_index] = GreptimeValue { value_data };
257 }
258 }
259 output_index += 1;
260 if !is_v1 {
261 pipeline_map.remove(column_name);
264 }
265 }
266 }
267 Ok(values)
268 }
269
270 pub fn transforms(&self) -> &Transforms {
271 &self.transforms
272 }
273
274 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275 &self.schema
276 }
277
278 pub fn transforms_mut(&mut self) -> &mut Transforms {
279 &mut self.transforms
280 }
281}
282
283#[derive(Debug, Default)]
287pub struct SchemaInfo {
288 pub schema: Vec<ColumnSchema>,
290 pub index: HashMap<String, usize>,
292}
293
294impl SchemaInfo {
295 pub fn with_capacity(capacity: usize) -> Self {
296 Self {
297 schema: Vec::with_capacity(capacity),
298 index: HashMap::with_capacity(capacity),
299 }
300 }
301
302 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
303 let mut index = HashMap::new();
304 for (i, schema) in schema_list.iter().enumerate() {
305 index.insert(schema.column_name.clone(), i);
306 }
307 Self {
308 schema: schema_list,
309 index,
310 }
311 }
312}
313
314fn resolve_schema(
315 index: Option<usize>,
316 value_data: ValueData,
317 column_schema: ColumnSchema,
318 row: &mut Vec<GreptimeValue>,
319 schema_info: &mut SchemaInfo,
320) -> Result<()> {
321 if let Some(index) = index {
322 let api_value = GreptimeValue {
323 value_data: Some(value_data),
324 };
325 let value_column_data_type = proto_value_type(&api_value).unwrap();
327 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
329 if value_column_data_type != schema_column_data_type {
330 IdentifyPipelineColumnTypeMismatchSnafu {
331 column: column_schema.column_name,
332 expected: schema_column_data_type.as_str_name(),
333 actual: value_column_data_type.as_str_name(),
334 }
335 .fail()
336 } else {
337 row[index] = api_value;
338 Ok(())
339 }
340 } else {
341 let key = column_schema.column_name.clone();
342 schema_info.schema.push(column_schema);
343 schema_info.index.insert(key, schema_info.schema.len() - 1);
344 let api_value = GreptimeValue {
345 value_data: Some(value_data),
346 };
347 row.push(api_value);
348 Ok(())
349 }
350}
351
352fn resolve_number_schema(
353 n: Number,
354 column_name: String,
355 index: Option<usize>,
356 row: &mut Vec<GreptimeValue>,
357 schema_info: &mut SchemaInfo,
358) -> Result<()> {
359 let (value, datatype, semantic_type) = if n.is_i64() {
360 (
361 ValueData::I64Value(n.as_i64().unwrap()),
362 ColumnDataType::Int64 as i32,
363 SemanticType::Field as i32,
364 )
365 } else if n.is_u64() {
366 (
367 ValueData::U64Value(n.as_u64().unwrap()),
368 ColumnDataType::Uint64 as i32,
369 SemanticType::Field as i32,
370 )
371 } else if n.is_f64() {
372 (
373 ValueData::F64Value(n.as_f64().unwrap()),
374 ColumnDataType::Float64 as i32,
375 SemanticType::Field as i32,
376 )
377 } else {
378 return UnsupportedNumberTypeSnafu { value: n }.fail();
379 };
380 resolve_schema(
381 index,
382 value,
383 ColumnSchema {
384 column_name,
385 datatype,
386 semantic_type,
387 datatype_extension: None,
388 options: None,
389 },
390 row,
391 schema_info,
392 )
393}
394
395fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>> {
396 match p_ctx.channel {
397 Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
398 values
399 .get(GREPTIME_TIMESTAMP)
400 .and_then(|v| v.as_i64())
401 .unwrap_or_default(),
402 ))),
403 _ => {
404 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
405 match custom_ts {
406 Some(ts) => {
407 let ts_field = values.get(ts.get_column_name());
408 Some(ts.get_timestamp(ts_field)).transpose()
409 }
410 None => Ok(Some(ValueData::TimestampNanosecondValue(
411 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
412 ))),
413 }
414 }
415 }
416}
417
418pub(crate) fn values_to_row(
419 schema_info: &mut SchemaInfo,
420 values: Value,
421 pipeline_ctx: &PipelineContext<'_>,
422 row: Option<Vec<GreptimeValue>>,
423) -> Result<Row> {
424 let mut row: Vec<GreptimeValue> =
425 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
426 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
427
428 let ts = calc_ts(pipeline_ctx, &values)?;
430
431 row.push(GreptimeValue { value_data: ts });
432
433 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
434
435 let ts_column_name = custom_ts
437 .as_ref()
438 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
439
440 let values = values.into_map().context(ValueMustBeMapSnafu)?;
441
442 for (column_name, value) in values {
443 if column_name == ts_column_name {
444 continue;
445 }
446
447 resolve_value(value, column_name, &mut row, schema_info, pipeline_ctx)?;
448 }
449 Ok(Row { values: row })
450}
451
452fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
453 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
454 SemanticType::Tag as i32
455 } else {
456 SemanticType::Field as i32
457 }
458}
459
460fn resolve_value(
461 value: Value,
462 column_name: String,
463 row: &mut Vec<GreptimeValue>,
464 schema_info: &mut SchemaInfo,
465 p_ctx: &PipelineContext,
466) -> Result<()> {
467 let index = schema_info.index.get(&column_name).copied();
468 let mut resolve_simple_type =
469 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
470 let semantic_type = decide_semantic(p_ctx, &column_name);
471 resolve_schema(
472 index,
473 value_data,
474 ColumnSchema {
475 column_name,
476 datatype: data_type as i32,
477 semantic_type,
478 datatype_extension: None,
479 options: None,
480 },
481 row,
482 schema_info,
483 )
484 };
485
486 match value {
487 Value::Null => {}
488
489 Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
490 let v = value.as_i64().unwrap();
492 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
493 }
494
495 Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
496 let v = value.as_u64().unwrap();
498 resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
499 }
500
501 Value::Float32(_) | Value::Float64(_) => {
502 let v = value.as_f64().unwrap();
504 resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
505 }
506
507 Value::Boolean(v) => {
508 resolve_simple_type(
509 ValueData::BoolValue(v),
510 column_name,
511 ColumnDataType::Boolean,
512 )?;
513 }
514
515 Value::String(v) => {
516 resolve_simple_type(
517 ValueData::StringValue(v),
518 column_name,
519 ColumnDataType::String,
520 )?;
521 }
522
523 Value::Timestamp(Timestamp::Nanosecond(ns)) => {
524 resolve_simple_type(
525 ValueData::TimestampNanosecondValue(ns),
526 column_name,
527 ColumnDataType::TimestampNanosecond,
528 )?;
529 }
530
531 Value::Timestamp(Timestamp::Microsecond(us)) => {
532 resolve_simple_type(
533 ValueData::TimestampMicrosecondValue(us),
534 column_name,
535 ColumnDataType::TimestampMicrosecond,
536 )?;
537 }
538
539 Value::Timestamp(Timestamp::Millisecond(ms)) => {
540 resolve_simple_type(
541 ValueData::TimestampMillisecondValue(ms),
542 column_name,
543 ColumnDataType::TimestampMillisecond,
544 )?;
545 }
546
547 Value::Timestamp(Timestamp::Second(s)) => {
548 resolve_simple_type(
549 ValueData::TimestampSecondValue(s),
550 column_name,
551 ColumnDataType::TimestampSecond,
552 )?;
553 }
554
555 Value::Array(_) | Value::Map(_) => {
556 let data: jsonb::Value = value.into();
557 resolve_schema(
558 index,
559 ValueData::BinaryValue(data.to_vec()),
560 ColumnSchema {
561 column_name,
562 datatype: ColumnDataType::Binary as i32,
563 semantic_type: SemanticType::Field as i32,
564 datatype_extension: Some(ColumnDataTypeExtension {
565 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
566 }),
567 options: None,
568 },
569 row,
570 schema_info,
571 )?;
572 }
573 }
574 Ok(())
575}
576
577fn identity_pipeline_inner(
578 pipeline_maps: Vec<Value>,
579 pipeline_ctx: &PipelineContext<'_>,
580) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
581 let skip_error = pipeline_ctx.pipeline_param.skip_error();
582 let mut schema_info = SchemaInfo::default();
583 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
584
585 schema_info.schema.push(ColumnSchema {
587 column_name: custom_ts
588 .map(|ts| ts.get_column_name().clone())
589 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
590 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
591 if pipeline_ctx.channel == Channel::Prometheus {
592 ColumnDataType::TimestampMillisecond
593 } else {
594 ColumnDataType::TimestampNanosecond
595 }
596 }) as i32,
597 semantic_type: SemanticType::Timestamp as i32,
598 datatype_extension: None,
599 options: None,
600 });
601
602 let mut opt_map = HashMap::new();
603 let len = pipeline_maps.len();
604
605 for mut pipeline_map in pipeline_maps {
606 let opt = unwrap_or_continue_if_err!(
607 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
608 skip_error
609 );
610 let row = unwrap_or_continue_if_err!(
611 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None),
612 skip_error
613 );
614
615 opt_map
616 .entry(opt)
617 .or_insert_with(|| Vec::with_capacity(len))
618 .push(row);
619 }
620
621 let column_count = schema_info.schema.len();
622 for (_, row) in opt_map.iter_mut() {
623 for row in row.iter_mut() {
624 let diff = column_count - row.values.len();
625 for _ in 0..diff {
626 row.values.push(GreptimeValue { value_data: None });
627 }
628 }
629 }
630
631 Ok((schema_info, opt_map))
632}
633
634pub fn identity_pipeline(
643 array: Vec<Value>,
644 table: Option<Arc<table::Table>>,
645 pipeline_ctx: &PipelineContext<'_>,
646) -> Result<HashMap<ContextOpt, Rows>> {
647 let skip_error = pipeline_ctx.pipeline_param.skip_error();
648 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
649 let mut results = Vec::with_capacity(array.len());
650 for item in array.into_iter() {
651 let result = unwrap_or_continue_if_err!(
652 flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
653 skip_error
654 );
655 results.push(result);
656 }
657 results
658 } else {
659 array
660 };
661
662 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
663 if let Some(table) = table {
664 let table_info = table.table_info();
665 for tag_name in table_info.meta.row_key_column_names() {
666 if let Some(index) = schema.index.get(tag_name) {
667 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
668 }
669 }
670 }
671
672 opt_map
673 .into_iter()
674 .map(|(opt, rows)| {
675 (
676 opt,
677 Rows {
678 schema: schema.schema.clone(),
679 rows,
680 },
681 )
682 })
683 .collect::<HashMap<ContextOpt, Rows>>()
684 })
685}
686
687pub fn flatten_object(object: Value, max_nested_levels: usize) -> Result<Value> {
692 let mut flattened = BTreeMap::new();
693 let object = object.into_map().context(ValueMustBeMapSnafu)?;
694
695 if !object.is_empty() {
696 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
698 }
699
700 Ok(Value::Map(Map { values: flattened }))
701}
702
703fn do_flatten_object(
704 dest: &mut BTreeMap<String, Value>,
705 base: Option<&str>,
706 object: BTreeMap<String, Value>,
707 current_level: usize,
708 max_nested_levels: usize,
709) -> Result<()> {
710 if current_level > max_nested_levels {
712 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
713 }
714
715 for (key, value) in object {
716 let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
717
718 match value {
719 Value::Map(object) => {
720 do_flatten_object(
721 dest,
722 Some(&new_key),
723 object.values,
724 current_level + 1,
725 max_nested_levels,
726 )?;
727 }
728 _ => {
730 dest.insert(new_key, value);
731 }
732 }
733 }
734
735 Ok(())
736}
737
738#[cfg(test)]
739mod tests {
740 use api::v1::SemanticType;
741
742 use super::*;
743 use crate::etl::{json_array_to_map, json_to_map};
744 use crate::{identity_pipeline, PipelineDefinition};
745
746 #[test]
747 fn test_identify_pipeline() {
748 let params = GreptimePipelineParams::default();
749 let pipeline_ctx = PipelineContext::new(
750 &PipelineDefinition::GreptimeIdentityPipeline(None),
751 ¶ms,
752 Channel::Unknown,
753 );
754 {
755 let array = vec![
756 serde_json::json!({
757 "woshinull": null,
758 "name": "Alice",
759 "age": 20,
760 "is_student": true,
761 "score": 99.5,
762 "hobbies": "reading",
763 "address": "Beijing",
764 }),
765 serde_json::json!({
766 "name": "Bob",
767 "age": 21,
768 "is_student": false,
769 "score": "88.5",
770 "hobbies": "swimming",
771 "address": "Shanghai",
772 "gaga": "gaga"
773 }),
774 ];
775 let array = json_array_to_map(array).unwrap();
776 let rows = identity_pipeline(array, None, &pipeline_ctx);
777 assert!(rows.is_err());
778 assert_eq!(
779 rows.err().unwrap().to_string(),
780 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
781 );
782 }
783 {
784 let array = vec![
785 serde_json::json!({
786 "woshinull": null,
787 "name": "Alice",
788 "age": 20,
789 "is_student": true,
790 "score": 99.5,
791 "hobbies": "reading",
792 "address": "Beijing",
793 }),
794 serde_json::json!({
795 "name": "Bob",
796 "age": 21,
797 "is_student": false,
798 "score": 88,
799 "hobbies": "swimming",
800 "address": "Shanghai",
801 "gaga": "gaga"
802 }),
803 ];
804 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
805 assert!(rows.is_err());
806 assert_eq!(
807 rows.err().unwrap().to_string(),
808 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
809 );
810 }
811 {
812 let array = vec![
813 serde_json::json!({
814 "woshinull": null,
815 "name": "Alice",
816 "age": 20,
817 "is_student": true,
818 "score": 99.5,
819 "hobbies": "reading",
820 "address": "Beijing",
821 }),
822 serde_json::json!({
823 "name": "Bob",
824 "age": 21,
825 "is_student": false,
826 "score": 88.5,
827 "hobbies": "swimming",
828 "address": "Shanghai",
829 "gaga": "gaga"
830 }),
831 ];
832 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
833 assert!(rows.is_ok());
834 let mut rows = rows.unwrap();
835 assert!(rows.len() == 1);
836 let rows = rows.remove(&ContextOpt::default()).unwrap();
837 assert_eq!(rows.schema.len(), 8);
838 assert_eq!(rows.rows.len(), 2);
839 assert_eq!(8, rows.rows[0].values.len());
840 assert_eq!(8, rows.rows[1].values.len());
841 }
842 {
843 let array = vec![
844 serde_json::json!({
845 "woshinull": null,
846 "name": "Alice",
847 "age": 20,
848 "is_student": true,
849 "score": 99.5,
850 "hobbies": "reading",
851 "address": "Beijing",
852 }),
853 serde_json::json!({
854 "name": "Bob",
855 "age": 21,
856 "is_student": false,
857 "score": 88.5,
858 "hobbies": "swimming",
859 "address": "Shanghai",
860 "gaga": "gaga"
861 }),
862 ];
863 let tag_column_names = ["name".to_string(), "address".to_string()];
864
865 let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
866 .map(|(mut schema, mut rows)| {
867 for name in tag_column_names {
868 if let Some(index) = schema.index.get(&name) {
869 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
870 }
871 }
872
873 assert!(rows.len() == 1);
874 let rows = rows.remove(&ContextOpt::default()).unwrap();
875
876 Rows {
877 schema: schema.schema,
878 rows,
879 }
880 });
881
882 assert!(rows.is_ok());
883 let rows = rows.unwrap();
884 assert_eq!(rows.schema.len(), 8);
885 assert_eq!(rows.rows.len(), 2);
886 assert_eq!(8, rows.rows[0].values.len());
887 assert_eq!(8, rows.rows[1].values.len());
888 assert_eq!(
889 rows.schema
890 .iter()
891 .find(|x| x.column_name == "name")
892 .unwrap()
893 .semantic_type,
894 SemanticType::Tag as i32
895 );
896 assert_eq!(
897 rows.schema
898 .iter()
899 .find(|x| x.column_name == "address")
900 .unwrap()
901 .semantic_type,
902 SemanticType::Tag as i32
903 );
904 assert_eq!(
905 rows.schema
906 .iter()
907 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
908 .count(),
909 2
910 );
911 }
912 }
913
914 #[test]
915 fn test_flatten() {
916 let test_cases = vec![
917 (
919 serde_json::json!(
920 {
921 "a": {
922 "b": {
923 "c": [1, 2, 3]
924 }
925 },
926 "d": [
927 "foo",
928 "bar"
929 ],
930 "e": {
931 "f": [7, 8, 9],
932 "g": {
933 "h": 123,
934 "i": "hello",
935 "j": {
936 "k": true
937 }
938 }
939 }
940 }
941 ),
942 10,
943 Some(serde_json::json!(
944 {
945 "a.b.c": [1,2,3],
946 "d": ["foo","bar"],
947 "e.f": [7,8,9],
948 "e.g.h": 123,
949 "e.g.i": "hello",
950 "e.g.j.k": true
951 }
952 )),
953 ),
954 (
956 serde_json::json!(
957 {
958 "a": {
959 "b": {
960 "c": {
961 "d": [1, 2, 3]
962 }
963 }
964 },
965 "e": [
966 "foo",
967 "bar"
968 ]
969 }
970 ),
971 3,
972 None,
973 ),
974 ];
975
976 for (input, max_depth, expected) in test_cases {
977 let input = json_to_map(input).unwrap();
978 let expected = expected.map(|e| json_to_map(e).unwrap());
979
980 let flattened_object = flatten_object(input, max_depth).ok();
981 assert_eq!(flattened_object, expected);
982 }
983 }
984
985 #[test]
986 fn test_greptime_pipeline_params() {
987 let params = Some("flatten_json_object=true");
988 let pipeline_params = GreptimePipelineParams::from_params(params);
989 assert!(pipeline_params.flatten_json_object());
990
991 let params = None;
992 let pipeline_params = GreptimePipelineParams::from_params(params);
993 assert!(!pipeline_params.flatten_json_object());
994 }
995}