1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40 Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::ctx_req::ContextOpt;
44use crate::etl::field::{Field, Fields};
45use crate::etl::transform::index::Index;
46use crate::etl::transform::{Transform, Transforms};
47use crate::etl::PipelineDocVersion;
48use crate::{unwrap_or_continue_if_err, PipelineContext};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57 transforms: Transforms,
58 schema: Vec<ColumnSchema>,
59}
60
61fn truthy<V: AsRef<str>>(v: V) -> bool {
62 let v = v.as_ref().to_lowercase();
63 v == "true" || v == "1" || v == "yes" || v == "on" || v == "t"
64}
65
66#[derive(Debug, Default)]
68pub struct GreptimePipelineParams {
69 options: HashMap<String, String>,
72
73 pub flatten_json_object: OnceCell<bool>,
75 pub skip_error: OnceCell<bool>,
77}
78
79impl GreptimePipelineParams {
80 pub fn from_params(params: Option<&str>) -> Self {
84 let options = Self::parse_header_str_to_map(params);
85
86 Self {
87 options,
88 skip_error: OnceCell::new(),
89 flatten_json_object: OnceCell::new(),
90 }
91 }
92
93 pub fn from_map(options: HashMap<String, String>) -> Self {
94 Self {
95 options,
96 skip_error: OnceCell::new(),
97 flatten_json_object: OnceCell::new(),
98 }
99 }
100
101 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
102 if let Some(params) = params {
103 if params.is_empty() {
104 HashMap::new()
105 } else {
106 params
107 .split('&')
108 .filter_map(|s| s.split_once('='))
109 .map(|(k, v)| (k.to_string(), v.to_string()))
110 .collect::<HashMap<String, String>>()
111 }
112 } else {
113 HashMap::new()
114 }
115 }
116
117 pub fn flatten_json_object(&self) -> bool {
119 *self.flatten_json_object.get_or_init(|| {
120 self.options
121 .get("flatten_json_object")
122 .map(|v| v == "true")
123 .unwrap_or(false)
124 })
125 }
126
127 pub fn skip_error(&self) -> bool {
129 *self
130 .skip_error
131 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
132 }
133}
134
135impl GreptimeTransformer {
136 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
138 let type_ = ColumnDataType::TimestampNanosecond;
139 let default = None;
140
141 let transform = Transform {
142 fields: Fields::one(Field::new(
143 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
144 None,
145 )),
146 type_,
147 default,
148 index: Some(Index::Time),
149 on_failure: Some(crate::etl::transform::OnFailure::Default),
150 tag: false,
151 };
152 transforms.push(transform);
153 }
154
155 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
157 let mut schema = vec![];
158 for transform in transforms.iter() {
159 schema.extend(coerce_columns(transform)?);
160 }
161 Ok(schema)
162 }
163}
164
165impl GreptimeTransformer {
166 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
167 let mut column_names_set = HashSet::new();
169 let mut timestamp_columns = vec![];
170
171 for transform in transforms.iter() {
172 let target_fields_set = transform
173 .fields
174 .iter()
175 .map(|f| f.target_or_input_field())
176 .collect::<HashSet<_>>();
177
178 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
179 if !intersections.is_empty() {
180 let duplicates = intersections.iter().join(",");
181 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
182 }
183
184 column_names_set.extend(target_fields_set);
185
186 if let Some(idx) = transform.index {
187 if idx == Index::Time {
188 match transform.fields.len() {
189 1 => {
191 timestamp_columns.push(transform.fields.first().unwrap().input_field())
192 }
193 _ => {
194 return TransformMultipleTimestampIndexSnafu {
195 columns: transform
196 .fields
197 .iter()
198 .map(|x| x.input_field())
199 .join(", "),
200 }
201 .fail();
202 }
203 }
204 }
205 }
206 }
207
208 let schema = match timestamp_columns.len() {
209 0 if doc_version == &PipelineDocVersion::V1 => {
210 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
212 GreptimeTransformer::init_schemas(&transforms)?
213 }
214 1 => GreptimeTransformer::init_schemas(&transforms)?,
215 count => {
216 let columns = timestamp_columns.iter().join(", ");
217 return TransformTimestampIndexCountSnafu { count, columns }.fail();
218 }
219 };
220 Ok(GreptimeTransformer { transforms, schema })
221 }
222
223 pub fn transform_mut(
224 &self,
225 pipeline_map: &mut VrlValue,
226 is_v1: bool,
227 ) -> Result<Vec<GreptimeValue>> {
228 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
229 let mut output_index = 0;
230 for transform in self.transforms.iter() {
231 for field in transform.fields.iter() {
232 let column_name = field.input_field();
233
234 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
235 match pipeline_map.get(column_name) {
237 Some(v) => {
238 let value_data = coerce_value(v, transform)?;
239 values[output_index] = GreptimeValue { value_data };
241 }
242 None => {
243 let value_data = match transform.on_failure {
244 Some(crate::etl::transform::OnFailure::Default) => {
245 match transform.get_default() {
246 Some(default) => Some(default.clone()),
247 None => transform.get_default_value_when_data_is_none(),
248 }
249 }
250 Some(crate::etl::transform::OnFailure::Ignore) => None,
251 None => None,
252 };
253 if transform.is_timeindex() && value_data.is_none() {
254 return TimeIndexMustBeNonNullSnafu.fail();
255 }
256 values[output_index] = GreptimeValue { value_data };
257 }
258 }
259 output_index += 1;
260 if !is_v1 {
261 pipeline_map.remove(column_name);
264 }
265 }
266 }
267 Ok(values)
268 }
269
270 pub fn transforms(&self) -> &Transforms {
271 &self.transforms
272 }
273
274 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275 &self.schema
276 }
277
278 pub fn transforms_mut(&mut self) -> &mut Transforms {
279 &mut self.transforms
280 }
281}
282
283#[derive(Debug, Default)]
287pub struct SchemaInfo {
288 pub schema: Vec<ColumnSchema>,
290 pub index: HashMap<String, usize>,
292}
293
294impl SchemaInfo {
295 pub fn with_capacity(capacity: usize) -> Self {
296 Self {
297 schema: Vec::with_capacity(capacity),
298 index: HashMap::with_capacity(capacity),
299 }
300 }
301
302 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
303 let mut index = HashMap::new();
304 for (i, schema) in schema_list.iter().enumerate() {
305 index.insert(schema.column_name.clone(), i);
306 }
307 Self {
308 schema: schema_list,
309 index,
310 }
311 }
312}
313
314fn resolve_schema(
315 index: Option<usize>,
316 value_data: ValueData,
317 column_schema: ColumnSchema,
318 row: &mut Vec<GreptimeValue>,
319 schema_info: &mut SchemaInfo,
320) -> Result<()> {
321 if let Some(index) = index {
322 let api_value = GreptimeValue {
323 value_data: Some(value_data),
324 };
325 let value_column_data_type = proto_value_type(&api_value).unwrap();
327 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
329 if value_column_data_type != schema_column_data_type {
330 IdentifyPipelineColumnTypeMismatchSnafu {
331 column: column_schema.column_name,
332 expected: schema_column_data_type.as_str_name(),
333 actual: value_column_data_type.as_str_name(),
334 }
335 .fail()
336 } else {
337 row[index] = api_value;
338 Ok(())
339 }
340 } else {
341 let key = column_schema.column_name.clone();
342 schema_info.schema.push(column_schema);
343 schema_info.index.insert(key, schema_info.schema.len() - 1);
344 let api_value = GreptimeValue {
345 value_data: Some(value_data),
346 };
347 row.push(api_value);
348 Ok(())
349 }
350}
351
352fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
353 match p_ctx.channel {
354 Channel::Prometheus => {
355 let ts = values
356 .as_object()
357 .and_then(|m| m.get(GREPTIME_TIMESTAMP))
358 .and_then(|ts| ts.try_into_i64().ok())
359 .unwrap_or_default();
360 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
361 }
362 _ => {
363 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
364 match custom_ts {
365 Some(ts) => {
366 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
367 Some(ts.get_timestamp_value(ts_field)).transpose()
368 }
369 None => Ok(Some(ValueData::TimestampNanosecondValue(
370 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
371 ))),
372 }
373 }
374 }
375}
376
377pub(crate) fn values_to_row(
378 schema_info: &mut SchemaInfo,
379 values: VrlValue,
380 pipeline_ctx: &PipelineContext<'_>,
381 row: Option<Vec<GreptimeValue>>,
382 need_calc_ts: bool,
383) -> Result<Row> {
384 let mut row: Vec<GreptimeValue> =
385 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
386 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
387
388 if need_calc_ts {
389 let ts = calc_ts(pipeline_ctx, &values)?;
391 row.push(GreptimeValue { value_data: ts });
392 }
393
394 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
395
396 let ts_column_name = custom_ts
398 .as_ref()
399 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
400
401 let values = values.into_object().context(ValueMustBeMapSnafu)?;
402
403 for (column_name, value) in values {
404 if column_name.as_str() == ts_column_name {
405 continue;
406 }
407
408 resolve_value(
409 value,
410 column_name.into(),
411 &mut row,
412 schema_info,
413 pipeline_ctx,
414 )?;
415 }
416 Ok(Row { values: row })
417}
418
419fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
420 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
421 SemanticType::Tag as i32
422 } else {
423 SemanticType::Field as i32
424 }
425}
426
427fn resolve_value(
428 value: VrlValue,
429 column_name: String,
430 row: &mut Vec<GreptimeValue>,
431 schema_info: &mut SchemaInfo,
432 p_ctx: &PipelineContext,
433) -> Result<()> {
434 let index = schema_info.index.get(&column_name).copied();
435 let mut resolve_simple_type =
436 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
437 let semantic_type = decide_semantic(p_ctx, &column_name);
438 resolve_schema(
439 index,
440 value_data,
441 ColumnSchema {
442 column_name,
443 datatype: data_type as i32,
444 semantic_type,
445 datatype_extension: None,
446 options: None,
447 },
448 row,
449 schema_info,
450 )
451 };
452
453 match value {
454 VrlValue::Null => {}
455
456 VrlValue::Integer(v) => {
457 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
459 }
460
461 VrlValue::Float(v) => {
462 resolve_simple_type(
464 ValueData::F64Value(v.into()),
465 column_name,
466 ColumnDataType::Float64,
467 )?;
468 }
469
470 VrlValue::Boolean(v) => {
471 resolve_simple_type(
472 ValueData::BoolValue(v),
473 column_name,
474 ColumnDataType::Boolean,
475 )?;
476 }
477
478 VrlValue::Bytes(v) => {
479 resolve_simple_type(
480 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
481 column_name,
482 ColumnDataType::String,
483 )?;
484 }
485
486 VrlValue::Regex(v) => {
487 warn!(
488 "Persisting regex value in the table, this should not happen, column_name: {}",
489 column_name
490 );
491 resolve_simple_type(
492 ValueData::StringValue(v.to_string()),
493 column_name,
494 ColumnDataType::String,
495 )?;
496 }
497
498 VrlValue::Timestamp(ts) => {
499 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
500 input: ts.to_rfc3339(),
501 })?;
502 resolve_simple_type(
503 ValueData::TimestampNanosecondValue(ns),
504 column_name,
505 ColumnDataType::TimestampNanosecond,
506 )?;
507 }
508
509 VrlValue::Array(_) | VrlValue::Object(_) => {
510 let data = vrl_value_to_jsonb_value(&value);
511 resolve_schema(
512 index,
513 ValueData::BinaryValue(data.to_vec()),
514 ColumnSchema {
515 column_name,
516 datatype: ColumnDataType::Binary as i32,
517 semantic_type: SemanticType::Field as i32,
518 datatype_extension: Some(ColumnDataTypeExtension {
519 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
520 }),
521 options: None,
522 },
523 row,
524 schema_info,
525 )?;
526 }
527 }
528 Ok(())
529}
530
531fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
532 match value {
533 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
534 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
535 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
536 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
537 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
538 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
539 VrlValue::Object(btree_map) => jsonb::Value::Object(
540 btree_map
541 .iter()
542 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
543 .collect(),
544 ),
545 VrlValue::Array(values) => jsonb::Value::Array(
546 values
547 .iter()
548 .map(|value| vrl_value_to_jsonb_value(value))
549 .collect(),
550 ),
551 VrlValue::Null => jsonb::Value::Null,
552 }
553}
554
555fn identity_pipeline_inner(
556 pipeline_maps: Vec<VrlValue>,
557 pipeline_ctx: &PipelineContext<'_>,
558) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
559 let skip_error = pipeline_ctx.pipeline_param.skip_error();
560 let mut schema_info = SchemaInfo::default();
561 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563 schema_info.schema.push(ColumnSchema {
565 column_name: custom_ts
566 .map(|ts| ts.get_column_name().to_string())
567 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
568 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
569 if pipeline_ctx.channel == Channel::Prometheus {
570 ColumnDataType::TimestampMillisecond
571 } else {
572 ColumnDataType::TimestampNanosecond
573 }
574 }) as i32,
575 semantic_type: SemanticType::Timestamp as i32,
576 datatype_extension: None,
577 options: None,
578 });
579
580 let mut opt_map = HashMap::new();
581 let len = pipeline_maps.len();
582
583 for mut pipeline_map in pipeline_maps {
584 let opt = unwrap_or_continue_if_err!(
585 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
586 skip_error
587 );
588 let row = unwrap_or_continue_if_err!(
589 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
590 skip_error
591 );
592
593 opt_map
594 .entry(opt)
595 .or_insert_with(|| Vec::with_capacity(len))
596 .push(row);
597 }
598
599 let column_count = schema_info.schema.len();
600 for (_, row) in opt_map.iter_mut() {
601 for row in row.iter_mut() {
602 let diff = column_count - row.values.len();
603 for _ in 0..diff {
604 row.values.push(GreptimeValue { value_data: None });
605 }
606 }
607 }
608
609 Ok((schema_info, opt_map))
610}
611
612pub fn identity_pipeline(
621 array: Vec<VrlValue>,
622 table: Option<Arc<table::Table>>,
623 pipeline_ctx: &PipelineContext<'_>,
624) -> Result<HashMap<ContextOpt, Rows>> {
625 let skip_error = pipeline_ctx.pipeline_param.skip_error();
626 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
627 let mut results = Vec::with_capacity(array.len());
628 for item in array.into_iter() {
629 let result = unwrap_or_continue_if_err!(
630 flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
631 skip_error
632 );
633 results.push(result);
634 }
635 results
636 } else {
637 array
638 };
639
640 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
641 if let Some(table) = table {
642 let table_info = table.table_info();
643 for tag_name in table_info.meta.row_key_column_names() {
644 if let Some(index) = schema.index.get(tag_name) {
645 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
646 }
647 }
648 }
649
650 opt_map
651 .into_iter()
652 .map(|(opt, rows)| {
653 (
654 opt,
655 Rows {
656 schema: schema.schema.clone(),
657 rows,
658 },
659 )
660 })
661 .collect::<HashMap<ContextOpt, Rows>>()
662 })
663}
664
665pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
670 let mut flattened = BTreeMap::new();
671 let object = object.into_object().context(ValueMustBeMapSnafu)?;
672
673 if !object.is_empty() {
674 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
676 }
677
678 Ok(VrlValue::Object(flattened))
679}
680
681fn do_flatten_object(
682 dest: &mut BTreeMap<KeyString, VrlValue>,
683 base: Option<&str>,
684 object: BTreeMap<KeyString, VrlValue>,
685 current_level: usize,
686 max_nested_levels: usize,
687) -> Result<()> {
688 if current_level > max_nested_levels {
690 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
691 }
692
693 for (key, value) in object {
694 let new_key = base.map_or_else(
695 || key.clone(),
696 |base_key| format!("{base_key}.{key}").into(),
697 );
698
699 match value {
700 VrlValue::Object(object) => {
701 do_flatten_object(
702 dest,
703 Some(&new_key),
704 object,
705 current_level + 1,
706 max_nested_levels,
707 )?;
708 }
709 _ => {
711 dest.insert(new_key, value);
712 }
713 }
714 }
715
716 Ok(())
717}
718
719#[cfg(test)]
720mod tests {
721 use api::v1::SemanticType;
722
723 use super::*;
724 use crate::{identity_pipeline, PipelineDefinition};
725
726 #[test]
727 fn test_identify_pipeline() {
728 let params = GreptimePipelineParams::default();
729 let pipeline_ctx = PipelineContext::new(
730 &PipelineDefinition::GreptimeIdentityPipeline(None),
731 ¶ms,
732 Channel::Unknown,
733 );
734 {
735 let array = [
736 serde_json::json!({
737 "woshinull": null,
738 "name": "Alice",
739 "age": 20,
740 "is_student": true,
741 "score": 99.5,
742 "hobbies": "reading",
743 "address": "Beijing",
744 }),
745 serde_json::json!({
746 "name": "Bob",
747 "age": 21,
748 "is_student": false,
749 "score": "88.5",
750 "hobbies": "swimming",
751 "address": "Shanghai",
752 "gaga": "gaga"
753 }),
754 ];
755 let array = array.iter().map(|v| v.into()).collect();
756 let rows = identity_pipeline(array, None, &pipeline_ctx);
757 assert!(rows.is_err());
758 assert_eq!(
759 rows.err().unwrap().to_string(),
760 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
761 );
762 }
763 {
764 let array = [
765 serde_json::json!({
766 "woshinull": null,
767 "name": "Alice",
768 "age": 20,
769 "is_student": true,
770 "score": 99.5,
771 "hobbies": "reading",
772 "address": "Beijing",
773 }),
774 serde_json::json!({
775 "name": "Bob",
776 "age": 21,
777 "is_student": false,
778 "score": 88,
779 "hobbies": "swimming",
780 "address": "Shanghai",
781 "gaga": "gaga"
782 }),
783 ];
784 let array = array.iter().map(|v| v.into()).collect();
785 let rows = identity_pipeline(array, None, &pipeline_ctx);
786 assert!(rows.is_err());
787 assert_eq!(
788 rows.err().unwrap().to_string(),
789 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
790 );
791 }
792 {
793 let array = [
794 serde_json::json!({
795 "woshinull": null,
796 "name": "Alice",
797 "age": 20,
798 "is_student": true,
799 "score": 99.5,
800 "hobbies": "reading",
801 "address": "Beijing",
802 }),
803 serde_json::json!({
804 "name": "Bob",
805 "age": 21,
806 "is_student": false,
807 "score": 88.5,
808 "hobbies": "swimming",
809 "address": "Shanghai",
810 "gaga": "gaga"
811 }),
812 ];
813 let array = array.iter().map(|v| v.into()).collect();
814 let rows = identity_pipeline(array, None, &pipeline_ctx);
815 assert!(rows.is_ok());
816 let mut rows = rows.unwrap();
817 assert!(rows.len() == 1);
818 let rows = rows.remove(&ContextOpt::default()).unwrap();
819 assert_eq!(rows.schema.len(), 8);
820 assert_eq!(rows.rows.len(), 2);
821 assert_eq!(8, rows.rows[0].values.len());
822 assert_eq!(8, rows.rows[1].values.len());
823 }
824 {
825 let array = [
826 serde_json::json!({
827 "woshinull": null,
828 "name": "Alice",
829 "age": 20,
830 "is_student": true,
831 "score": 99.5,
832 "hobbies": "reading",
833 "address": "Beijing",
834 }),
835 serde_json::json!({
836 "name": "Bob",
837 "age": 21,
838 "is_student": false,
839 "score": 88.5,
840 "hobbies": "swimming",
841 "address": "Shanghai",
842 "gaga": "gaga"
843 }),
844 ];
845 let tag_column_names = ["name".to_string(), "address".to_string()];
846
847 let rows =
848 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
849 .map(|(mut schema, mut rows)| {
850 for name in tag_column_names {
851 if let Some(index) = schema.index.get(&name) {
852 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
853 }
854 }
855
856 assert!(rows.len() == 1);
857 let rows = rows.remove(&ContextOpt::default()).unwrap();
858
859 Rows {
860 schema: schema.schema,
861 rows,
862 }
863 });
864
865 assert!(rows.is_ok());
866 let rows = rows.unwrap();
867 assert_eq!(rows.schema.len(), 8);
868 assert_eq!(rows.rows.len(), 2);
869 assert_eq!(8, rows.rows[0].values.len());
870 assert_eq!(8, rows.rows[1].values.len());
871 assert_eq!(
872 rows.schema
873 .iter()
874 .find(|x| x.column_name == "name")
875 .unwrap()
876 .semantic_type,
877 SemanticType::Tag as i32
878 );
879 assert_eq!(
880 rows.schema
881 .iter()
882 .find(|x| x.column_name == "address")
883 .unwrap()
884 .semantic_type,
885 SemanticType::Tag as i32
886 );
887 assert_eq!(
888 rows.schema
889 .iter()
890 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
891 .count(),
892 2
893 );
894 }
895 }
896
897 #[test]
898 fn test_flatten() {
899 let test_cases = vec![
900 (
902 serde_json::json!(
903 {
904 "a": {
905 "b": {
906 "c": [1, 2, 3]
907 }
908 },
909 "d": [
910 "foo",
911 "bar"
912 ],
913 "e": {
914 "f": [7, 8, 9],
915 "g": {
916 "h": 123,
917 "i": "hello",
918 "j": {
919 "k": true
920 }
921 }
922 }
923 }
924 ),
925 10,
926 Some(serde_json::json!(
927 {
928 "a.b.c": [1,2,3],
929 "d": ["foo","bar"],
930 "e.f": [7,8,9],
931 "e.g.h": 123,
932 "e.g.i": "hello",
933 "e.g.j.k": true
934 }
935 )),
936 ),
937 (
939 serde_json::json!(
940 {
941 "a": {
942 "b": {
943 "c": {
944 "d": [1, 2, 3]
945 }
946 }
947 },
948 "e": [
949 "foo",
950 "bar"
951 ]
952 }
953 ),
954 3,
955 None,
956 ),
957 ];
958
959 for (input, max_depth, expected) in test_cases {
960 let input = input.into();
961 let expected = expected.map(|e| e.into());
962
963 let flattened_object = flatten_object(input, max_depth).ok();
964 assert_eq!(flattened_object, expected);
965 }
966 }
967
968 #[test]
969 fn test_greptime_pipeline_params() {
970 let params = Some("flatten_json_object=true");
971 let pipeline_params = GreptimePipelineParams::from_params(params);
972 assert!(pipeline_params.flatten_json_object());
973
974 let params = None;
975 let pipeline_params = GreptimePipelineParams::from_params(params);
976 assert!(!pipeline_params.flatten_json_object());
977 }
978}