1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40 Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::PipelineDocVersion;
44use crate::etl::ctx_req::ContextOpt;
45use crate::etl::field::{Field, Fields};
46use crate::etl::transform::index::Index;
47use crate::etl::transform::{Transform, Transforms};
48use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57 transforms: Transforms,
58 schema: Vec<ColumnSchema>,
59}
60
61#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64 options: HashMap<String, String>,
67
68 pub flatten_json_object: OnceCell<bool>,
70 pub skip_error: OnceCell<bool>,
72}
73
74impl GreptimePipelineParams {
75 pub fn from_params(params: Option<&str>) -> Self {
79 let options = Self::parse_header_str_to_map(params);
80
81 Self {
82 options,
83 skip_error: OnceCell::new(),
84 flatten_json_object: OnceCell::new(),
85 }
86 }
87
88 pub fn from_map(options: HashMap<String, String>) -> Self {
89 Self {
90 options,
91 skip_error: OnceCell::new(),
92 flatten_json_object: OnceCell::new(),
93 }
94 }
95
96 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
97 if let Some(params) = params {
98 if params.is_empty() {
99 HashMap::new()
100 } else {
101 params
102 .split('&')
103 .filter_map(|s| s.split_once('='))
104 .map(|(k, v)| (k.to_string(), v.to_string()))
105 .collect::<HashMap<String, String>>()
106 }
107 } else {
108 HashMap::new()
109 }
110 }
111
112 pub fn flatten_json_object(&self) -> bool {
114 *self.flatten_json_object.get_or_init(|| {
115 self.options
116 .get("flatten_json_object")
117 .map(|v| v == "true")
118 .unwrap_or(false)
119 })
120 }
121
122 pub fn skip_error(&self) -> bool {
124 *self
125 .skip_error
126 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
127 }
128}
129
130impl GreptimeTransformer {
131 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
133 let type_ = ColumnDataType::TimestampNanosecond;
134 let default = None;
135
136 let transform = Transform {
137 fields: Fields::one(Field::new(
138 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
139 None,
140 )),
141 type_,
142 default,
143 index: Some(Index::Time),
144 on_failure: Some(crate::etl::transform::OnFailure::Default),
145 tag: false,
146 };
147 transforms.push(transform);
148 }
149
150 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152 let mut schema = vec![];
153 for transform in transforms.iter() {
154 schema.extend(coerce_columns(transform)?);
155 }
156 Ok(schema)
157 }
158}
159
160impl GreptimeTransformer {
161 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162 let mut column_names_set = HashSet::new();
164 let mut timestamp_columns = vec![];
165
166 for transform in transforms.iter() {
167 let target_fields_set = transform
168 .fields
169 .iter()
170 .map(|f| f.target_or_input_field())
171 .collect::<HashSet<_>>();
172
173 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174 if !intersections.is_empty() {
175 let duplicates = intersections.iter().join(",");
176 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177 }
178
179 column_names_set.extend(target_fields_set);
180
181 if let Some(idx) = transform.index
182 && idx == Index::Time
183 {
184 match transform.fields.len() {
185 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
187 _ => {
188 return TransformMultipleTimestampIndexSnafu {
189 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
190 }
191 .fail();
192 }
193 }
194 }
195 }
196
197 let schema = match timestamp_columns.len() {
198 0 if doc_version == &PipelineDocVersion::V1 => {
199 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
201 GreptimeTransformer::init_schemas(&transforms)?
202 }
203 1 => GreptimeTransformer::init_schemas(&transforms)?,
204 count => {
205 let columns = timestamp_columns.iter().join(", ");
206 return TransformTimestampIndexCountSnafu { count, columns }.fail();
207 }
208 };
209 Ok(GreptimeTransformer { transforms, schema })
210 }
211
212 pub fn transform_mut(
213 &self,
214 pipeline_map: &mut VrlValue,
215 is_v1: bool,
216 ) -> Result<Vec<GreptimeValue>> {
217 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
218 let mut output_index = 0;
219 for transform in self.transforms.iter() {
220 for field in transform.fields.iter() {
221 let column_name = field.input_field();
222
223 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
224 match pipeline_map.get(column_name) {
226 Some(v) => {
227 let value_data = coerce_value(v, transform)?;
228 values[output_index] = GreptimeValue { value_data };
230 }
231 None => {
232 let value_data = match transform.on_failure {
233 Some(crate::etl::transform::OnFailure::Default) => {
234 match transform.get_default() {
235 Some(default) => Some(default.clone()),
236 None => transform.get_default_value_when_data_is_none(),
237 }
238 }
239 Some(crate::etl::transform::OnFailure::Ignore) => None,
240 None => None,
241 };
242 if transform.is_timeindex() && value_data.is_none() {
243 return TimeIndexMustBeNonNullSnafu.fail();
244 }
245 values[output_index] = GreptimeValue { value_data };
246 }
247 }
248 output_index += 1;
249 if !is_v1 {
250 pipeline_map.remove(column_name);
253 }
254 }
255 }
256 Ok(values)
257 }
258
259 pub fn transforms(&self) -> &Transforms {
260 &self.transforms
261 }
262
263 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
264 &self.schema
265 }
266
267 pub fn transforms_mut(&mut self) -> &mut Transforms {
268 &mut self.transforms
269 }
270}
271
272#[derive(Debug, Default)]
276pub struct SchemaInfo {
277 pub schema: Vec<ColumnSchema>,
279 pub index: HashMap<String, usize>,
281}
282
283impl SchemaInfo {
284 pub fn with_capacity(capacity: usize) -> Self {
285 Self {
286 schema: Vec::with_capacity(capacity),
287 index: HashMap::with_capacity(capacity),
288 }
289 }
290
291 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
292 let mut index = HashMap::new();
293 for (i, schema) in schema_list.iter().enumerate() {
294 index.insert(schema.column_name.clone(), i);
295 }
296 Self {
297 schema: schema_list,
298 index,
299 }
300 }
301}
302
303fn resolve_schema(
304 index: Option<usize>,
305 value_data: ValueData,
306 column_schema: ColumnSchema,
307 row: &mut Vec<GreptimeValue>,
308 schema_info: &mut SchemaInfo,
309) -> Result<()> {
310 if let Some(index) = index {
311 let api_value = GreptimeValue {
312 value_data: Some(value_data),
313 };
314 let value_column_data_type = proto_value_type(&api_value).unwrap();
316 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
318 if value_column_data_type != schema_column_data_type {
319 IdentifyPipelineColumnTypeMismatchSnafu {
320 column: column_schema.column_name,
321 expected: schema_column_data_type.as_str_name(),
322 actual: value_column_data_type.as_str_name(),
323 }
324 .fail()
325 } else {
326 row[index] = api_value;
327 Ok(())
328 }
329 } else {
330 let key = column_schema.column_name.clone();
331 schema_info.schema.push(column_schema);
332 schema_info.index.insert(key, schema_info.schema.len() - 1);
333 let api_value = GreptimeValue {
334 value_data: Some(value_data),
335 };
336 row.push(api_value);
337 Ok(())
338 }
339}
340
341fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
342 match p_ctx.channel {
343 Channel::Prometheus => {
344 let ts = values
345 .as_object()
346 .and_then(|m| m.get(GREPTIME_TIMESTAMP))
347 .and_then(|ts| ts.try_into_i64().ok())
348 .unwrap_or_default();
349 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
350 }
351 _ => {
352 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
353 match custom_ts {
354 Some(ts) => {
355 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
356 Some(ts.get_timestamp_value(ts_field)).transpose()
357 }
358 None => Ok(Some(ValueData::TimestampNanosecondValue(
359 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
360 ))),
361 }
362 }
363 }
364}
365
366pub(crate) fn values_to_row(
373 schema_info: &mut SchemaInfo,
374 values: VrlValue,
375 pipeline_ctx: &PipelineContext<'_>,
376 row: Option<Vec<GreptimeValue>>,
377 need_calc_ts: bool,
378) -> Result<Row> {
379 let mut row: Vec<GreptimeValue> =
380 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
381 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
382
383 if need_calc_ts {
384 let ts = calc_ts(pipeline_ctx, &values)?;
386 row.push(GreptimeValue { value_data: ts });
387 }
388
389 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
390
391 let ts_column_name = custom_ts
393 .as_ref()
394 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
395
396 let values = values.into_object().context(ValueMustBeMapSnafu)?;
397
398 for (column_name, value) in values {
399 if need_calc_ts && column_name.as_str() == ts_column_name {
400 continue;
401 }
402
403 resolve_value(
404 value,
405 column_name.into(),
406 &mut row,
407 schema_info,
408 pipeline_ctx,
409 )?;
410 }
411 Ok(Row { values: row })
412}
413
414fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
415 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
416 SemanticType::Tag as i32
417 } else {
418 SemanticType::Field as i32
419 }
420}
421
422fn resolve_value(
423 value: VrlValue,
424 column_name: String,
425 row: &mut Vec<GreptimeValue>,
426 schema_info: &mut SchemaInfo,
427 p_ctx: &PipelineContext,
428) -> Result<()> {
429 let index = schema_info.index.get(&column_name).copied();
430 let mut resolve_simple_type =
431 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
432 let semantic_type = decide_semantic(p_ctx, &column_name);
433 resolve_schema(
434 index,
435 value_data,
436 ColumnSchema {
437 column_name,
438 datatype: data_type as i32,
439 semantic_type,
440 datatype_extension: None,
441 options: None,
442 },
443 row,
444 schema_info,
445 )
446 };
447
448 match value {
449 VrlValue::Null => {}
450
451 VrlValue::Integer(v) => {
452 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
454 }
455
456 VrlValue::Float(v) => {
457 resolve_simple_type(
459 ValueData::F64Value(v.into()),
460 column_name,
461 ColumnDataType::Float64,
462 )?;
463 }
464
465 VrlValue::Boolean(v) => {
466 resolve_simple_type(
467 ValueData::BoolValue(v),
468 column_name,
469 ColumnDataType::Boolean,
470 )?;
471 }
472
473 VrlValue::Bytes(v) => {
474 resolve_simple_type(
475 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
476 column_name,
477 ColumnDataType::String,
478 )?;
479 }
480
481 VrlValue::Regex(v) => {
482 warn!(
483 "Persisting regex value in the table, this should not happen, column_name: {}",
484 column_name
485 );
486 resolve_simple_type(
487 ValueData::StringValue(v.to_string()),
488 column_name,
489 ColumnDataType::String,
490 )?;
491 }
492
493 VrlValue::Timestamp(ts) => {
494 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
495 input: ts.to_rfc3339(),
496 })?;
497 resolve_simple_type(
498 ValueData::TimestampNanosecondValue(ns),
499 column_name,
500 ColumnDataType::TimestampNanosecond,
501 )?;
502 }
503
504 VrlValue::Array(_) | VrlValue::Object(_) => {
505 let data = vrl_value_to_jsonb_value(&value);
506 resolve_schema(
507 index,
508 ValueData::BinaryValue(data.to_vec()),
509 ColumnSchema {
510 column_name,
511 datatype: ColumnDataType::Binary as i32,
512 semantic_type: SemanticType::Field as i32,
513 datatype_extension: Some(ColumnDataTypeExtension {
514 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
515 }),
516 options: None,
517 },
518 row,
519 schema_info,
520 )?;
521 }
522 }
523 Ok(())
524}
525
526fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
527 match value {
528 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
529 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
530 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
531 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
532 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
533 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
534 VrlValue::Object(btree_map) => jsonb::Value::Object(
535 btree_map
536 .iter()
537 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
538 .collect(),
539 ),
540 VrlValue::Array(values) => jsonb::Value::Array(
541 values
542 .iter()
543 .map(|value| vrl_value_to_jsonb_value(value))
544 .collect(),
545 ),
546 VrlValue::Null => jsonb::Value::Null,
547 }
548}
549
550fn identity_pipeline_inner(
551 pipeline_maps: Vec<VrlValue>,
552 pipeline_ctx: &PipelineContext<'_>,
553) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
554 let skip_error = pipeline_ctx.pipeline_param.skip_error();
555 let mut schema_info = SchemaInfo::default();
556 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
557
558 schema_info.schema.push(ColumnSchema {
560 column_name: custom_ts
561 .map(|ts| ts.get_column_name().to_string())
562 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
563 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
564 if pipeline_ctx.channel == Channel::Prometheus {
565 ColumnDataType::TimestampMillisecond
566 } else {
567 ColumnDataType::TimestampNanosecond
568 }
569 }) as i32,
570 semantic_type: SemanticType::Timestamp as i32,
571 datatype_extension: None,
572 options: None,
573 });
574
575 let mut opt_map = HashMap::new();
576 let len = pipeline_maps.len();
577
578 for mut pipeline_map in pipeline_maps {
579 let opt = unwrap_or_continue_if_err!(
580 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
581 skip_error
582 );
583 let row = unwrap_or_continue_if_err!(
584 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
585 skip_error
586 );
587
588 opt_map
589 .entry(opt)
590 .or_insert_with(|| Vec::with_capacity(len))
591 .push(row);
592 }
593
594 let column_count = schema_info.schema.len();
595 for (_, row) in opt_map.iter_mut() {
596 for row in row.iter_mut() {
597 let diff = column_count - row.values.len();
598 for _ in 0..diff {
599 row.values.push(GreptimeValue { value_data: None });
600 }
601 }
602 }
603
604 Ok((schema_info, opt_map))
605}
606
607pub fn identity_pipeline(
616 array: Vec<VrlValue>,
617 table: Option<Arc<table::Table>>,
618 pipeline_ctx: &PipelineContext<'_>,
619) -> Result<HashMap<ContextOpt, Rows>> {
620 let skip_error = pipeline_ctx.pipeline_param.skip_error();
621 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
622 let mut results = Vec::with_capacity(array.len());
623 for item in array.into_iter() {
624 let result = unwrap_or_continue_if_err!(
625 flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
626 skip_error
627 );
628 results.push(result);
629 }
630 results
631 } else {
632 array
633 };
634
635 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
636 if let Some(table) = table {
637 let table_info = table.table_info();
638 for tag_name in table_info.meta.row_key_column_names() {
639 if let Some(index) = schema.index.get(tag_name) {
640 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
641 }
642 }
643 }
644
645 opt_map
646 .into_iter()
647 .map(|(opt, rows)| {
648 (
649 opt,
650 Rows {
651 schema: schema.schema.clone(),
652 rows,
653 },
654 )
655 })
656 .collect::<HashMap<ContextOpt, Rows>>()
657 })
658}
659
660pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
665 let mut flattened = BTreeMap::new();
666 let object = object.into_object().context(ValueMustBeMapSnafu)?;
667
668 if !object.is_empty() {
669 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
671 }
672
673 Ok(VrlValue::Object(flattened))
674}
675
676fn do_flatten_object(
677 dest: &mut BTreeMap<KeyString, VrlValue>,
678 base: Option<&str>,
679 object: BTreeMap<KeyString, VrlValue>,
680 current_level: usize,
681 max_nested_levels: usize,
682) -> Result<()> {
683 if current_level > max_nested_levels {
685 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
686 }
687
688 for (key, value) in object {
689 let new_key = base.map_or_else(
690 || key.clone(),
691 |base_key| format!("{base_key}.{key}").into(),
692 );
693
694 match value {
695 VrlValue::Object(object) => {
696 do_flatten_object(
697 dest,
698 Some(&new_key),
699 object,
700 current_level + 1,
701 max_nested_levels,
702 )?;
703 }
704 _ => {
706 dest.insert(new_key, value);
707 }
708 }
709 }
710
711 Ok(())
712}
713
714#[cfg(test)]
715mod tests {
716 use api::v1::SemanticType;
717
718 use super::*;
719 use crate::{PipelineDefinition, identity_pipeline};
720
721 #[test]
722 fn test_identify_pipeline() {
723 let params = GreptimePipelineParams::default();
724 let pipeline_ctx = PipelineContext::new(
725 &PipelineDefinition::GreptimeIdentityPipeline(None),
726 ¶ms,
727 Channel::Unknown,
728 );
729 {
730 let array = [
731 serde_json::json!({
732 "woshinull": null,
733 "name": "Alice",
734 "age": 20,
735 "is_student": true,
736 "score": 99.5,
737 "hobbies": "reading",
738 "address": "Beijing",
739 }),
740 serde_json::json!({
741 "name": "Bob",
742 "age": 21,
743 "is_student": false,
744 "score": "88.5",
745 "hobbies": "swimming",
746 "address": "Shanghai",
747 "gaga": "gaga"
748 }),
749 ];
750 let array = array.iter().map(|v| v.into()).collect();
751 let rows = identity_pipeline(array, None, &pipeline_ctx);
752 assert!(rows.is_err());
753 assert_eq!(
754 rows.err().unwrap().to_string(),
755 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
756 );
757 }
758 {
759 let array = [
760 serde_json::json!({
761 "woshinull": null,
762 "name": "Alice",
763 "age": 20,
764 "is_student": true,
765 "score": 99.5,
766 "hobbies": "reading",
767 "address": "Beijing",
768 }),
769 serde_json::json!({
770 "name": "Bob",
771 "age": 21,
772 "is_student": false,
773 "score": 88,
774 "hobbies": "swimming",
775 "address": "Shanghai",
776 "gaga": "gaga"
777 }),
778 ];
779 let array = array.iter().map(|v| v.into()).collect();
780 let rows = identity_pipeline(array, None, &pipeline_ctx);
781 assert!(rows.is_err());
782 assert_eq!(
783 rows.err().unwrap().to_string(),
784 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
785 );
786 }
787 {
788 let array = [
789 serde_json::json!({
790 "woshinull": null,
791 "name": "Alice",
792 "age": 20,
793 "is_student": true,
794 "score": 99.5,
795 "hobbies": "reading",
796 "address": "Beijing",
797 }),
798 serde_json::json!({
799 "name": "Bob",
800 "age": 21,
801 "is_student": false,
802 "score": 88.5,
803 "hobbies": "swimming",
804 "address": "Shanghai",
805 "gaga": "gaga"
806 }),
807 ];
808 let array = array.iter().map(|v| v.into()).collect();
809 let rows = identity_pipeline(array, None, &pipeline_ctx);
810 assert!(rows.is_ok());
811 let mut rows = rows.unwrap();
812 assert!(rows.len() == 1);
813 let rows = rows.remove(&ContextOpt::default()).unwrap();
814 assert_eq!(rows.schema.len(), 8);
815 assert_eq!(rows.rows.len(), 2);
816 assert_eq!(8, rows.rows[0].values.len());
817 assert_eq!(8, rows.rows[1].values.len());
818 }
819 {
820 let array = [
821 serde_json::json!({
822 "woshinull": null,
823 "name": "Alice",
824 "age": 20,
825 "is_student": true,
826 "score": 99.5,
827 "hobbies": "reading",
828 "address": "Beijing",
829 }),
830 serde_json::json!({
831 "name": "Bob",
832 "age": 21,
833 "is_student": false,
834 "score": 88.5,
835 "hobbies": "swimming",
836 "address": "Shanghai",
837 "gaga": "gaga"
838 }),
839 ];
840 let tag_column_names = ["name".to_string(), "address".to_string()];
841
842 let rows =
843 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
844 .map(|(mut schema, mut rows)| {
845 for name in tag_column_names {
846 if let Some(index) = schema.index.get(&name) {
847 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
848 }
849 }
850
851 assert!(rows.len() == 1);
852 let rows = rows.remove(&ContextOpt::default()).unwrap();
853
854 Rows {
855 schema: schema.schema,
856 rows,
857 }
858 });
859
860 assert!(rows.is_ok());
861 let rows = rows.unwrap();
862 assert_eq!(rows.schema.len(), 8);
863 assert_eq!(rows.rows.len(), 2);
864 assert_eq!(8, rows.rows[0].values.len());
865 assert_eq!(8, rows.rows[1].values.len());
866 assert_eq!(
867 rows.schema
868 .iter()
869 .find(|x| x.column_name == "name")
870 .unwrap()
871 .semantic_type,
872 SemanticType::Tag as i32
873 );
874 assert_eq!(
875 rows.schema
876 .iter()
877 .find(|x| x.column_name == "address")
878 .unwrap()
879 .semantic_type,
880 SemanticType::Tag as i32
881 );
882 assert_eq!(
883 rows.schema
884 .iter()
885 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
886 .count(),
887 2
888 );
889 }
890 }
891
892 #[test]
893 fn test_flatten() {
894 let test_cases = vec![
895 (
897 serde_json::json!(
898 {
899 "a": {
900 "b": {
901 "c": [1, 2, 3]
902 }
903 },
904 "d": [
905 "foo",
906 "bar"
907 ],
908 "e": {
909 "f": [7, 8, 9],
910 "g": {
911 "h": 123,
912 "i": "hello",
913 "j": {
914 "k": true
915 }
916 }
917 }
918 }
919 ),
920 10,
921 Some(serde_json::json!(
922 {
923 "a.b.c": [1,2,3],
924 "d": ["foo","bar"],
925 "e.f": [7,8,9],
926 "e.g.h": 123,
927 "e.g.i": "hello",
928 "e.g.j.k": true
929 }
930 )),
931 ),
932 (
934 serde_json::json!(
935 {
936 "a": {
937 "b": {
938 "c": {
939 "d": [1, 2, 3]
940 }
941 }
942 },
943 "e": [
944 "foo",
945 "bar"
946 ]
947 }
948 ),
949 3,
950 None,
951 ),
952 ];
953
954 for (input, max_depth, expected) in test_cases {
955 let input = input.into();
956 let expected = expected.map(|e| e.into());
957
958 let flattened_object = flatten_object(input, max_depth).ok();
959 assert_eq!(flattened_object, expected);
960 }
961 }
962
963 #[test]
964 fn test_greptime_pipeline_params() {
965 let params = Some("flatten_json_object=true");
966 let pipeline_params = GreptimePipelineParams::from_params(params);
967 assert!(pipeline_params.flatten_json_object());
968
969 let params = None;
970 let pipeline_params = GreptimePipelineParams::from_params(params);
971 assert!(!pipeline_params.flatten_json_object());
972 }
973}