1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40 Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::ctx_req::ContextOpt;
44use crate::etl::field::{Field, Fields};
45use crate::etl::transform::index::Index;
46use crate::etl::transform::{Transform, Transforms};
47use crate::etl::PipelineDocVersion;
48use crate::{truthy, unwrap_or_continue_if_err, PipelineContext};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57 transforms: Transforms,
58 schema: Vec<ColumnSchema>,
59}
60
61#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64 options: HashMap<String, String>,
67
68 pub flatten_json_object: OnceCell<bool>,
70 pub skip_error: OnceCell<bool>,
72}
73
74impl GreptimePipelineParams {
75 pub fn from_params(params: Option<&str>) -> Self {
79 let options = Self::parse_header_str_to_map(params);
80
81 Self {
82 options,
83 skip_error: OnceCell::new(),
84 flatten_json_object: OnceCell::new(),
85 }
86 }
87
88 pub fn from_map(options: HashMap<String, String>) -> Self {
89 Self {
90 options,
91 skip_error: OnceCell::new(),
92 flatten_json_object: OnceCell::new(),
93 }
94 }
95
96 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
97 if let Some(params) = params {
98 if params.is_empty() {
99 HashMap::new()
100 } else {
101 params
102 .split('&')
103 .filter_map(|s| s.split_once('='))
104 .map(|(k, v)| (k.to_string(), v.to_string()))
105 .collect::<HashMap<String, String>>()
106 }
107 } else {
108 HashMap::new()
109 }
110 }
111
112 pub fn flatten_json_object(&self) -> bool {
114 *self.flatten_json_object.get_or_init(|| {
115 self.options
116 .get("flatten_json_object")
117 .map(|v| v == "true")
118 .unwrap_or(false)
119 })
120 }
121
122 pub fn skip_error(&self) -> bool {
124 *self
125 .skip_error
126 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
127 }
128}
129
130impl GreptimeTransformer {
131 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
133 let type_ = ColumnDataType::TimestampNanosecond;
134 let default = None;
135
136 let transform = Transform {
137 fields: Fields::one(Field::new(
138 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
139 None,
140 )),
141 type_,
142 default,
143 index: Some(Index::Time),
144 on_failure: Some(crate::etl::transform::OnFailure::Default),
145 tag: false,
146 };
147 transforms.push(transform);
148 }
149
150 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152 let mut schema = vec![];
153 for transform in transforms.iter() {
154 schema.extend(coerce_columns(transform)?);
155 }
156 Ok(schema)
157 }
158}
159
160impl GreptimeTransformer {
161 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162 let mut column_names_set = HashSet::new();
164 let mut timestamp_columns = vec![];
165
166 for transform in transforms.iter() {
167 let target_fields_set = transform
168 .fields
169 .iter()
170 .map(|f| f.target_or_input_field())
171 .collect::<HashSet<_>>();
172
173 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174 if !intersections.is_empty() {
175 let duplicates = intersections.iter().join(",");
176 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177 }
178
179 column_names_set.extend(target_fields_set);
180
181 if let Some(idx) = transform.index {
182 if idx == Index::Time {
183 match transform.fields.len() {
184 1 => {
186 timestamp_columns.push(transform.fields.first().unwrap().input_field())
187 }
188 _ => {
189 return TransformMultipleTimestampIndexSnafu {
190 columns: transform
191 .fields
192 .iter()
193 .map(|x| x.input_field())
194 .join(", "),
195 }
196 .fail();
197 }
198 }
199 }
200 }
201 }
202
203 let schema = match timestamp_columns.len() {
204 0 if doc_version == &PipelineDocVersion::V1 => {
205 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
207 GreptimeTransformer::init_schemas(&transforms)?
208 }
209 1 => GreptimeTransformer::init_schemas(&transforms)?,
210 count => {
211 let columns = timestamp_columns.iter().join(", ");
212 return TransformTimestampIndexCountSnafu { count, columns }.fail();
213 }
214 };
215 Ok(GreptimeTransformer { transforms, schema })
216 }
217
218 pub fn transform_mut(
219 &self,
220 pipeline_map: &mut VrlValue,
221 is_v1: bool,
222 ) -> Result<Vec<GreptimeValue>> {
223 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
224 let mut output_index = 0;
225 for transform in self.transforms.iter() {
226 for field in transform.fields.iter() {
227 let column_name = field.input_field();
228
229 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
230 match pipeline_map.get(column_name) {
232 Some(v) => {
233 let value_data = coerce_value(v, transform)?;
234 values[output_index] = GreptimeValue { value_data };
236 }
237 None => {
238 let value_data = match transform.on_failure {
239 Some(crate::etl::transform::OnFailure::Default) => {
240 match transform.get_default() {
241 Some(default) => Some(default.clone()),
242 None => transform.get_default_value_when_data_is_none(),
243 }
244 }
245 Some(crate::etl::transform::OnFailure::Ignore) => None,
246 None => None,
247 };
248 if transform.is_timeindex() && value_data.is_none() {
249 return TimeIndexMustBeNonNullSnafu.fail();
250 }
251 values[output_index] = GreptimeValue { value_data };
252 }
253 }
254 output_index += 1;
255 if !is_v1 {
256 pipeline_map.remove(column_name);
259 }
260 }
261 }
262 Ok(values)
263 }
264
265 pub fn transforms(&self) -> &Transforms {
266 &self.transforms
267 }
268
269 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
270 &self.schema
271 }
272
273 pub fn transforms_mut(&mut self) -> &mut Transforms {
274 &mut self.transforms
275 }
276}
277
278#[derive(Debug, Default)]
282pub struct SchemaInfo {
283 pub schema: Vec<ColumnSchema>,
285 pub index: HashMap<String, usize>,
287}
288
289impl SchemaInfo {
290 pub fn with_capacity(capacity: usize) -> Self {
291 Self {
292 schema: Vec::with_capacity(capacity),
293 index: HashMap::with_capacity(capacity),
294 }
295 }
296
297 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
298 let mut index = HashMap::new();
299 for (i, schema) in schema_list.iter().enumerate() {
300 index.insert(schema.column_name.clone(), i);
301 }
302 Self {
303 schema: schema_list,
304 index,
305 }
306 }
307}
308
309fn resolve_schema(
310 index: Option<usize>,
311 value_data: ValueData,
312 column_schema: ColumnSchema,
313 row: &mut Vec<GreptimeValue>,
314 schema_info: &mut SchemaInfo,
315) -> Result<()> {
316 if let Some(index) = index {
317 let api_value = GreptimeValue {
318 value_data: Some(value_data),
319 };
320 let value_column_data_type = proto_value_type(&api_value).unwrap();
322 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
324 if value_column_data_type != schema_column_data_type {
325 IdentifyPipelineColumnTypeMismatchSnafu {
326 column: column_schema.column_name,
327 expected: schema_column_data_type.as_str_name(),
328 actual: value_column_data_type.as_str_name(),
329 }
330 .fail()
331 } else {
332 row[index] = api_value;
333 Ok(())
334 }
335 } else {
336 let key = column_schema.column_name.clone();
337 schema_info.schema.push(column_schema);
338 schema_info.index.insert(key, schema_info.schema.len() - 1);
339 let api_value = GreptimeValue {
340 value_data: Some(value_data),
341 };
342 row.push(api_value);
343 Ok(())
344 }
345}
346
347fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
348 match p_ctx.channel {
349 Channel::Prometheus => {
350 let ts = values
351 .as_object()
352 .and_then(|m| m.get(GREPTIME_TIMESTAMP))
353 .and_then(|ts| ts.try_into_i64().ok())
354 .unwrap_or_default();
355 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
356 }
357 _ => {
358 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
359 match custom_ts {
360 Some(ts) => {
361 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
362 Some(ts.get_timestamp_value(ts_field)).transpose()
363 }
364 None => Ok(Some(ValueData::TimestampNanosecondValue(
365 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
366 ))),
367 }
368 }
369 }
370}
371
372pub(crate) fn values_to_row(
379 schema_info: &mut SchemaInfo,
380 values: VrlValue,
381 pipeline_ctx: &PipelineContext<'_>,
382 row: Option<Vec<GreptimeValue>>,
383 need_calc_ts: bool,
384) -> Result<Row> {
385 let mut row: Vec<GreptimeValue> =
386 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
387 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
388
389 if need_calc_ts {
390 let ts = calc_ts(pipeline_ctx, &values)?;
392 row.push(GreptimeValue { value_data: ts });
393 }
394
395 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
396
397 let ts_column_name = custom_ts
399 .as_ref()
400 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
401
402 let values = values.into_object().context(ValueMustBeMapSnafu)?;
403
404 for (column_name, value) in values {
405 if need_calc_ts && column_name.as_str() == ts_column_name {
406 continue;
407 }
408
409 resolve_value(
410 value,
411 column_name.into(),
412 &mut row,
413 schema_info,
414 pipeline_ctx,
415 )?;
416 }
417 Ok(Row { values: row })
418}
419
420fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
421 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
422 SemanticType::Tag as i32
423 } else {
424 SemanticType::Field as i32
425 }
426}
427
428fn resolve_value(
429 value: VrlValue,
430 column_name: String,
431 row: &mut Vec<GreptimeValue>,
432 schema_info: &mut SchemaInfo,
433 p_ctx: &PipelineContext,
434) -> Result<()> {
435 let index = schema_info.index.get(&column_name).copied();
436 let mut resolve_simple_type =
437 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
438 let semantic_type = decide_semantic(p_ctx, &column_name);
439 resolve_schema(
440 index,
441 value_data,
442 ColumnSchema {
443 column_name,
444 datatype: data_type as i32,
445 semantic_type,
446 datatype_extension: None,
447 options: None,
448 },
449 row,
450 schema_info,
451 )
452 };
453
454 match value {
455 VrlValue::Null => {}
456
457 VrlValue::Integer(v) => {
458 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
460 }
461
462 VrlValue::Float(v) => {
463 resolve_simple_type(
465 ValueData::F64Value(v.into()),
466 column_name,
467 ColumnDataType::Float64,
468 )?;
469 }
470
471 VrlValue::Boolean(v) => {
472 resolve_simple_type(
473 ValueData::BoolValue(v),
474 column_name,
475 ColumnDataType::Boolean,
476 )?;
477 }
478
479 VrlValue::Bytes(v) => {
480 resolve_simple_type(
481 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
482 column_name,
483 ColumnDataType::String,
484 )?;
485 }
486
487 VrlValue::Regex(v) => {
488 warn!(
489 "Persisting regex value in the table, this should not happen, column_name: {}",
490 column_name
491 );
492 resolve_simple_type(
493 ValueData::StringValue(v.to_string()),
494 column_name,
495 ColumnDataType::String,
496 )?;
497 }
498
499 VrlValue::Timestamp(ts) => {
500 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
501 input: ts.to_rfc3339(),
502 })?;
503 resolve_simple_type(
504 ValueData::TimestampNanosecondValue(ns),
505 column_name,
506 ColumnDataType::TimestampNanosecond,
507 )?;
508 }
509
510 VrlValue::Array(_) | VrlValue::Object(_) => {
511 let data = vrl_value_to_jsonb_value(&value);
512 resolve_schema(
513 index,
514 ValueData::BinaryValue(data.to_vec()),
515 ColumnSchema {
516 column_name,
517 datatype: ColumnDataType::Binary as i32,
518 semantic_type: SemanticType::Field as i32,
519 datatype_extension: Some(ColumnDataTypeExtension {
520 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
521 }),
522 options: None,
523 },
524 row,
525 schema_info,
526 )?;
527 }
528 }
529 Ok(())
530}
531
532fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
533 match value {
534 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
535 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
536 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
537 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
538 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
539 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
540 VrlValue::Object(btree_map) => jsonb::Value::Object(
541 btree_map
542 .iter()
543 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
544 .collect(),
545 ),
546 VrlValue::Array(values) => jsonb::Value::Array(
547 values
548 .iter()
549 .map(|value| vrl_value_to_jsonb_value(value))
550 .collect(),
551 ),
552 VrlValue::Null => jsonb::Value::Null,
553 }
554}
555
556fn identity_pipeline_inner(
557 pipeline_maps: Vec<VrlValue>,
558 pipeline_ctx: &PipelineContext<'_>,
559) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
560 let skip_error = pipeline_ctx.pipeline_param.skip_error();
561 let mut schema_info = SchemaInfo::default();
562 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
563
564 schema_info.schema.push(ColumnSchema {
566 column_name: custom_ts
567 .map(|ts| ts.get_column_name().to_string())
568 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
569 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
570 if pipeline_ctx.channel == Channel::Prometheus {
571 ColumnDataType::TimestampMillisecond
572 } else {
573 ColumnDataType::TimestampNanosecond
574 }
575 }) as i32,
576 semantic_type: SemanticType::Timestamp as i32,
577 datatype_extension: None,
578 options: None,
579 });
580
581 let mut opt_map = HashMap::new();
582 let len = pipeline_maps.len();
583
584 for mut pipeline_map in pipeline_maps {
585 let opt = unwrap_or_continue_if_err!(
586 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
587 skip_error
588 );
589 let row = unwrap_or_continue_if_err!(
590 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
591 skip_error
592 );
593
594 opt_map
595 .entry(opt)
596 .or_insert_with(|| Vec::with_capacity(len))
597 .push(row);
598 }
599
600 let column_count = schema_info.schema.len();
601 for (_, row) in opt_map.iter_mut() {
602 for row in row.iter_mut() {
603 let diff = column_count - row.values.len();
604 for _ in 0..diff {
605 row.values.push(GreptimeValue { value_data: None });
606 }
607 }
608 }
609
610 Ok((schema_info, opt_map))
611}
612
613pub fn identity_pipeline(
622 array: Vec<VrlValue>,
623 table: Option<Arc<table::Table>>,
624 pipeline_ctx: &PipelineContext<'_>,
625) -> Result<HashMap<ContextOpt, Rows>> {
626 let skip_error = pipeline_ctx.pipeline_param.skip_error();
627 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
628 let mut results = Vec::with_capacity(array.len());
629 for item in array.into_iter() {
630 let result = unwrap_or_continue_if_err!(
631 flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
632 skip_error
633 );
634 results.push(result);
635 }
636 results
637 } else {
638 array
639 };
640
641 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
642 if let Some(table) = table {
643 let table_info = table.table_info();
644 for tag_name in table_info.meta.row_key_column_names() {
645 if let Some(index) = schema.index.get(tag_name) {
646 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
647 }
648 }
649 }
650
651 opt_map
652 .into_iter()
653 .map(|(opt, rows)| {
654 (
655 opt,
656 Rows {
657 schema: schema.schema.clone(),
658 rows,
659 },
660 )
661 })
662 .collect::<HashMap<ContextOpt, Rows>>()
663 })
664}
665
666pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
671 let mut flattened = BTreeMap::new();
672 let object = object.into_object().context(ValueMustBeMapSnafu)?;
673
674 if !object.is_empty() {
675 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
677 }
678
679 Ok(VrlValue::Object(flattened))
680}
681
682fn do_flatten_object(
683 dest: &mut BTreeMap<KeyString, VrlValue>,
684 base: Option<&str>,
685 object: BTreeMap<KeyString, VrlValue>,
686 current_level: usize,
687 max_nested_levels: usize,
688) -> Result<()> {
689 if current_level > max_nested_levels {
691 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
692 }
693
694 for (key, value) in object {
695 let new_key = base.map_or_else(
696 || key.clone(),
697 |base_key| format!("{base_key}.{key}").into(),
698 );
699
700 match value {
701 VrlValue::Object(object) => {
702 do_flatten_object(
703 dest,
704 Some(&new_key),
705 object,
706 current_level + 1,
707 max_nested_levels,
708 )?;
709 }
710 _ => {
712 dest.insert(new_key, value);
713 }
714 }
715 }
716
717 Ok(())
718}
719
720#[cfg(test)]
721mod tests {
722 use api::v1::SemanticType;
723
724 use super::*;
725 use crate::{identity_pipeline, PipelineDefinition};
726
727 #[test]
728 fn test_identify_pipeline() {
729 let params = GreptimePipelineParams::default();
730 let pipeline_ctx = PipelineContext::new(
731 &PipelineDefinition::GreptimeIdentityPipeline(None),
732 ¶ms,
733 Channel::Unknown,
734 );
735 {
736 let array = [
737 serde_json::json!({
738 "woshinull": null,
739 "name": "Alice",
740 "age": 20,
741 "is_student": true,
742 "score": 99.5,
743 "hobbies": "reading",
744 "address": "Beijing",
745 }),
746 serde_json::json!({
747 "name": "Bob",
748 "age": 21,
749 "is_student": false,
750 "score": "88.5",
751 "hobbies": "swimming",
752 "address": "Shanghai",
753 "gaga": "gaga"
754 }),
755 ];
756 let array = array.iter().map(|v| v.into()).collect();
757 let rows = identity_pipeline(array, None, &pipeline_ctx);
758 assert!(rows.is_err());
759 assert_eq!(
760 rows.err().unwrap().to_string(),
761 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
762 );
763 }
764 {
765 let array = [
766 serde_json::json!({
767 "woshinull": null,
768 "name": "Alice",
769 "age": 20,
770 "is_student": true,
771 "score": 99.5,
772 "hobbies": "reading",
773 "address": "Beijing",
774 }),
775 serde_json::json!({
776 "name": "Bob",
777 "age": 21,
778 "is_student": false,
779 "score": 88,
780 "hobbies": "swimming",
781 "address": "Shanghai",
782 "gaga": "gaga"
783 }),
784 ];
785 let array = array.iter().map(|v| v.into()).collect();
786 let rows = identity_pipeline(array, None, &pipeline_ctx);
787 assert!(rows.is_err());
788 assert_eq!(
789 rows.err().unwrap().to_string(),
790 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
791 );
792 }
793 {
794 let array = [
795 serde_json::json!({
796 "woshinull": null,
797 "name": "Alice",
798 "age": 20,
799 "is_student": true,
800 "score": 99.5,
801 "hobbies": "reading",
802 "address": "Beijing",
803 }),
804 serde_json::json!({
805 "name": "Bob",
806 "age": 21,
807 "is_student": false,
808 "score": 88.5,
809 "hobbies": "swimming",
810 "address": "Shanghai",
811 "gaga": "gaga"
812 }),
813 ];
814 let array = array.iter().map(|v| v.into()).collect();
815 let rows = identity_pipeline(array, None, &pipeline_ctx);
816 assert!(rows.is_ok());
817 let mut rows = rows.unwrap();
818 assert!(rows.len() == 1);
819 let rows = rows.remove(&ContextOpt::default()).unwrap();
820 assert_eq!(rows.schema.len(), 8);
821 assert_eq!(rows.rows.len(), 2);
822 assert_eq!(8, rows.rows[0].values.len());
823 assert_eq!(8, rows.rows[1].values.len());
824 }
825 {
826 let array = [
827 serde_json::json!({
828 "woshinull": null,
829 "name": "Alice",
830 "age": 20,
831 "is_student": true,
832 "score": 99.5,
833 "hobbies": "reading",
834 "address": "Beijing",
835 }),
836 serde_json::json!({
837 "name": "Bob",
838 "age": 21,
839 "is_student": false,
840 "score": 88.5,
841 "hobbies": "swimming",
842 "address": "Shanghai",
843 "gaga": "gaga"
844 }),
845 ];
846 let tag_column_names = ["name".to_string(), "address".to_string()];
847
848 let rows =
849 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
850 .map(|(mut schema, mut rows)| {
851 for name in tag_column_names {
852 if let Some(index) = schema.index.get(&name) {
853 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
854 }
855 }
856
857 assert!(rows.len() == 1);
858 let rows = rows.remove(&ContextOpt::default()).unwrap();
859
860 Rows {
861 schema: schema.schema,
862 rows,
863 }
864 });
865
866 assert!(rows.is_ok());
867 let rows = rows.unwrap();
868 assert_eq!(rows.schema.len(), 8);
869 assert_eq!(rows.rows.len(), 2);
870 assert_eq!(8, rows.rows[0].values.len());
871 assert_eq!(8, rows.rows[1].values.len());
872 assert_eq!(
873 rows.schema
874 .iter()
875 .find(|x| x.column_name == "name")
876 .unwrap()
877 .semantic_type,
878 SemanticType::Tag as i32
879 );
880 assert_eq!(
881 rows.schema
882 .iter()
883 .find(|x| x.column_name == "address")
884 .unwrap()
885 .semantic_type,
886 SemanticType::Tag as i32
887 );
888 assert_eq!(
889 rows.schema
890 .iter()
891 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
892 .count(),
893 2
894 );
895 }
896 }
897
898 #[test]
899 fn test_flatten() {
900 let test_cases = vec![
901 (
903 serde_json::json!(
904 {
905 "a": {
906 "b": {
907 "c": [1, 2, 3]
908 }
909 },
910 "d": [
911 "foo",
912 "bar"
913 ],
914 "e": {
915 "f": [7, 8, 9],
916 "g": {
917 "h": 123,
918 "i": "hello",
919 "j": {
920 "k": true
921 }
922 }
923 }
924 }
925 ),
926 10,
927 Some(serde_json::json!(
928 {
929 "a.b.c": [1,2,3],
930 "d": ["foo","bar"],
931 "e.f": [7,8,9],
932 "e.g.h": 123,
933 "e.g.i": "hello",
934 "e.g.j.k": true
935 }
936 )),
937 ),
938 (
940 serde_json::json!(
941 {
942 "a": {
943 "b": {
944 "c": {
945 "d": [1, 2, 3]
946 }
947 }
948 },
949 "e": [
950 "foo",
951 "bar"
952 ]
953 }
954 ),
955 3,
956 None,
957 ),
958 ];
959
960 for (input, max_depth, expected) in test_cases {
961 let input = input.into();
962 let expected = expected.map(|e| e.into());
963
964 let flattened_object = flatten_object(input, max_depth).ok();
965 assert_eq!(flattened_object, expected);
966 }
967 }
968
969 #[test]
970 fn test_greptime_pipeline_params() {
971 let params = Some("flatten_json_object=true");
972 let pipeline_params = GreptimePipelineParams::from_params(params);
973 assert!(pipeline_params.flatten_json_object());
974
975 let params = None;
976 let pipeline_params = GreptimePipelineParams::from_params(params);
977 assert!(!pipeline_params.flatten_json_object());
978 }
979}