1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40 ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
41 Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53pub type RowWithTableSuffix = (Row, Option<String>);
55
56#[derive(Debug, Clone)]
59pub struct GreptimeTransformer {
60 transforms: Transforms,
61 schema: Vec<ColumnSchema>,
62}
63
64#[derive(Debug, Default)]
66pub struct GreptimePipelineParams {
67 options: HashMap<String, String>,
70
71 pub skip_error: OnceCell<bool>,
73 pub max_nested_levels: OnceCell<usize>,
76}
77
78impl GreptimePipelineParams {
79 pub fn from_params(params: Option<&str>) -> Self {
83 let options = Self::parse_header_str_to_map(params);
84
85 Self {
86 options,
87 skip_error: OnceCell::new(),
88 max_nested_levels: OnceCell::new(),
89 }
90 }
91
92 pub fn from_map(options: HashMap<String, String>) -> Self {
93 Self {
94 options,
95 skip_error: OnceCell::new(),
96 max_nested_levels: OnceCell::new(),
97 }
98 }
99
100 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
101 if let Some(params) = params {
102 if params.is_empty() {
103 HashMap::new()
104 } else {
105 params
106 .split('&')
107 .filter_map(|s| s.split_once('='))
108 .map(|(k, v)| (k.to_string(), v.to_string()))
109 .collect::<HashMap<String, String>>()
110 }
111 } else {
112 HashMap::new()
113 }
114 }
115
116 pub fn skip_error(&self) -> bool {
118 *self
119 .skip_error
120 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
121 }
122
123 pub fn max_nested_levels(&self) -> usize {
126 *self.max_nested_levels.get_or_init(|| {
127 self.options
128 .get("max_nested_levels")
129 .and_then(|s| s.parse::<usize>().ok())
130 .filter(|v| *v > 0)
131 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
132 })
133 }
134}
135
136impl GreptimeTransformer {
137 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
139 let type_ = ColumnDataType::TimestampNanosecond;
140 let default = None;
141
142 let transform = Transform {
143 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
144 type_,
145 default,
146 index: Some(Index::Time),
147 on_failure: Some(crate::etl::transform::OnFailure::Default),
148 tag: false,
149 };
150 transforms.push(transform);
151 }
152
153 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
155 let mut schema = vec![];
156 for transform in transforms.iter() {
157 schema.extend(coerce_columns(transform)?);
158 }
159 Ok(schema)
160 }
161}
162
163impl GreptimeTransformer {
164 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
165 let mut column_names_set = HashSet::new();
167 let mut timestamp_columns = vec![];
168
169 for transform in transforms.iter() {
170 let target_fields_set = transform
171 .fields
172 .iter()
173 .map(|f| f.target_or_input_field())
174 .collect::<HashSet<_>>();
175
176 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
177 if !intersections.is_empty() {
178 let duplicates = intersections.iter().join(",");
179 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
180 }
181
182 column_names_set.extend(target_fields_set);
183
184 if let Some(idx) = transform.index
185 && idx == Index::Time
186 {
187 match transform.fields.len() {
188 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
190 _ => {
191 return TransformMultipleTimestampIndexSnafu {
192 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
193 }
194 .fail();
195 }
196 }
197 }
198 }
199
200 let schema = match timestamp_columns.len() {
201 0 if doc_version == &PipelineDocVersion::V1 => {
202 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
204 GreptimeTransformer::init_schemas(&transforms)?
205 }
206 1 => GreptimeTransformer::init_schemas(&transforms)?,
207 count => {
208 let columns = timestamp_columns.iter().join(", ");
209 return TransformTimestampIndexCountSnafu { count, columns }.fail();
210 }
211 };
212 Ok(GreptimeTransformer { transforms, schema })
213 }
214
215 pub fn transform_mut(
216 &self,
217 pipeline_map: &mut VrlValue,
218 is_v1: bool,
219 ) -> Result<Vec<GreptimeValue>> {
220 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
221 let mut output_index = 0;
222 for transform in self.transforms.iter() {
223 for field in transform.fields.iter() {
224 let column_name = field.input_field();
225
226 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
227 match pipeline_map.get(column_name) {
229 Some(v) => {
230 let value_data = coerce_value(v, transform)?;
231 values[output_index] = GreptimeValue { value_data };
233 }
234 None => {
235 let value_data = match transform.on_failure {
236 Some(crate::etl::transform::OnFailure::Default) => {
237 match transform.get_default() {
238 Some(default) => Some(default.clone()),
239 None => transform.get_default_value_when_data_is_none(),
240 }
241 }
242 Some(crate::etl::transform::OnFailure::Ignore) => None,
243 None => None,
244 };
245 if transform.is_timeindex() && value_data.is_none() {
246 return TimeIndexMustBeNonNullSnafu.fail();
247 }
248 values[output_index] = GreptimeValue { value_data };
249 }
250 }
251 output_index += 1;
252 if !is_v1 {
253 pipeline_map.remove(column_name);
256 }
257 }
258 }
259 Ok(values)
260 }
261
262 pub fn transforms(&self) -> &Transforms {
263 &self.transforms
264 }
265
266 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
267 &self.schema
268 }
269
270 pub fn transforms_mut(&mut self) -> &mut Transforms {
271 &mut self.transforms
272 }
273}
274
275#[derive(Debug, Default)]
279pub struct SchemaInfo {
280 pub schema: Vec<ColumnSchema>,
282 pub index: HashMap<String, usize>,
284}
285
286impl SchemaInfo {
287 pub fn with_capacity(capacity: usize) -> Self {
288 Self {
289 schema: Vec::with_capacity(capacity),
290 index: HashMap::with_capacity(capacity),
291 }
292 }
293
294 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
295 let mut index = HashMap::new();
296 for (i, schema) in schema_list.iter().enumerate() {
297 index.insert(schema.column_name.clone(), i);
298 }
299 Self {
300 schema: schema_list,
301 index,
302 }
303 }
304}
305
306fn resolve_schema(
307 index: Option<usize>,
308 value_data: ValueData,
309 column_schema: ColumnSchema,
310 row: &mut Vec<GreptimeValue>,
311 schema_info: &mut SchemaInfo,
312) -> Result<()> {
313 if let Some(index) = index {
314 let api_value = GreptimeValue {
315 value_data: Some(value_data),
316 };
317 let value_column_data_type = proto_value_type(&api_value).unwrap();
319 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
321 if value_column_data_type != schema_column_data_type {
322 IdentifyPipelineColumnTypeMismatchSnafu {
323 column: column_schema.column_name,
324 expected: schema_column_data_type.as_str_name(),
325 actual: value_column_data_type.as_str_name(),
326 }
327 .fail()
328 } else {
329 row[index] = api_value;
330 Ok(())
331 }
332 } else {
333 let key = column_schema.column_name.clone();
334 schema_info.schema.push(column_schema);
335 schema_info.index.insert(key, schema_info.schema.len() - 1);
336 let api_value = GreptimeValue {
337 value_data: Some(value_data),
338 };
339 row.push(api_value);
340 Ok(())
341 }
342}
343
344fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
345 match p_ctx.channel {
346 Channel::Prometheus => {
347 let ts = values
348 .as_object()
349 .and_then(|m| m.get(greptime_timestamp()))
350 .and_then(|ts| ts.try_into_i64().ok())
351 .unwrap_or_default();
352 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
353 }
354 _ => {
355 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
356 match custom_ts {
357 Some(ts) => {
358 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
359 Some(ts.get_timestamp_value(ts_field)).transpose()
360 }
361 None => Ok(Some(ValueData::TimestampNanosecondValue(
362 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
363 ))),
364 }
365 }
366 }
367}
368
369pub(crate) fn values_to_rows(
378 schema_info: &mut SchemaInfo,
379 mut values: VrlValue,
380 pipeline_ctx: &PipelineContext<'_>,
381 row: Option<Vec<GreptimeValue>>,
382 need_calc_ts: bool,
383 tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
384) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
385 let skip_error = pipeline_ctx.pipeline_param.skip_error();
386 let VrlValue::Array(arr) = values else {
387 let mut result = std::collections::HashMap::new();
389
390 let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
391 Ok(r) => r,
392 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
393 };
394
395 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
396 let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
397 Ok(r) => r,
398 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
399 };
400 result.insert(opt, vec![(row, table_suffix)]);
401 return Ok(result);
402 };
403
404 let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
405 std::collections::HashMap::new();
406 for (index, mut value) in arr.into_iter().enumerate() {
407 if !value.is_object() {
408 unwrap_or_continue_if_err!(
409 ArrayElementMustBeObjectSnafu {
410 index,
411 actual_type: value.kind_str().to_string(),
412 }
413 .fail(),
414 skip_error
415 );
416 }
417
418 let mut opt = unwrap_or_continue_if_err!(
420 ContextOpt::from_pipeline_map_to_opt(&mut value),
421 skip_error
422 );
423 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
424 let transformed_row = unwrap_or_continue_if_err!(
425 values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
426 skip_error
427 );
428 rows_by_context
429 .entry(opt)
430 .or_default()
431 .push((transformed_row, table_suffix));
432 }
433 Ok(rows_by_context)
434}
435
436pub(crate) fn values_to_row(
443 schema_info: &mut SchemaInfo,
444 values: VrlValue,
445 pipeline_ctx: &PipelineContext<'_>,
446 row: Option<Vec<GreptimeValue>>,
447 need_calc_ts: bool,
448) -> Result<Row> {
449 let mut row: Vec<GreptimeValue> =
450 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
451 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
452
453 if need_calc_ts {
454 let ts = calc_ts(pipeline_ctx, &values)?;
456 row.push(GreptimeValue { value_data: ts });
457 }
458
459 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
460
461 let ts_column_name = custom_ts
463 .as_ref()
464 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
465
466 let values = values.into_object().context(ValueMustBeMapSnafu)?;
467
468 for (column_name, value) in values {
469 if need_calc_ts && column_name.as_str() == ts_column_name {
470 continue;
471 }
472
473 resolve_value(
474 value,
475 column_name.into(),
476 &mut row,
477 schema_info,
478 pipeline_ctx,
479 )?;
480 }
481 Ok(Row { values: row })
482}
483
484fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
485 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
486 SemanticType::Tag as i32
487 } else {
488 SemanticType::Field as i32
489 }
490}
491
492fn resolve_value(
493 value: VrlValue,
494 column_name: String,
495 row: &mut Vec<GreptimeValue>,
496 schema_info: &mut SchemaInfo,
497 p_ctx: &PipelineContext,
498) -> Result<()> {
499 let index = schema_info.index.get(&column_name).copied();
500 let mut resolve_simple_type =
501 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
502 let semantic_type = decide_semantic(p_ctx, &column_name);
503 resolve_schema(
504 index,
505 value_data,
506 ColumnSchema {
507 column_name,
508 datatype: data_type as i32,
509 semantic_type,
510 datatype_extension: None,
511 options: None,
512 },
513 row,
514 schema_info,
515 )
516 };
517
518 match value {
519 VrlValue::Null => {}
520
521 VrlValue::Integer(v) => {
522 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
524 }
525
526 VrlValue::Float(v) => {
527 resolve_simple_type(
529 ValueData::F64Value(v.into()),
530 column_name,
531 ColumnDataType::Float64,
532 )?;
533 }
534
535 VrlValue::Boolean(v) => {
536 resolve_simple_type(
537 ValueData::BoolValue(v),
538 column_name,
539 ColumnDataType::Boolean,
540 )?;
541 }
542
543 VrlValue::Bytes(v) => {
544 resolve_simple_type(
545 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
546 column_name,
547 ColumnDataType::String,
548 )?;
549 }
550
551 VrlValue::Regex(v) => {
552 warn!(
553 "Persisting regex value in the table, this should not happen, column_name: {}",
554 column_name
555 );
556 resolve_simple_type(
557 ValueData::StringValue(v.to_string()),
558 column_name,
559 ColumnDataType::String,
560 )?;
561 }
562
563 VrlValue::Timestamp(ts) => {
564 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
565 input: ts.to_rfc3339(),
566 })?;
567 resolve_simple_type(
568 ValueData::TimestampNanosecondValue(ns),
569 column_name,
570 ColumnDataType::TimestampNanosecond,
571 )?;
572 }
573
574 VrlValue::Array(_) | VrlValue::Object(_) => {
575 let data = vrl_value_to_jsonb_value(&value);
576 resolve_schema(
577 index,
578 ValueData::BinaryValue(data.to_vec()),
579 ColumnSchema {
580 column_name,
581 datatype: ColumnDataType::Binary as i32,
582 semantic_type: SemanticType::Field as i32,
583 datatype_extension: Some(ColumnDataTypeExtension {
584 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
585 }),
586 options: None,
587 },
588 row,
589 schema_info,
590 )?;
591 }
592 }
593 Ok(())
594}
595
596fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
597 match value {
598 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
599 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
600 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
601 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
602 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
603 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
604 VrlValue::Object(btree_map) => jsonb::Value::Object(
605 btree_map
606 .iter()
607 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
608 .collect(),
609 ),
610 VrlValue::Array(values) => jsonb::Value::Array(
611 values
612 .iter()
613 .map(|value| vrl_value_to_jsonb_value(value))
614 .collect(),
615 ),
616 VrlValue::Null => jsonb::Value::Null,
617 }
618}
619
620fn identity_pipeline_inner(
621 pipeline_maps: Vec<VrlValue>,
622 pipeline_ctx: &PipelineContext<'_>,
623) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
624 let skip_error = pipeline_ctx.pipeline_param.skip_error();
625 let mut schema_info = SchemaInfo::default();
626 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
627
628 schema_info.schema.push(ColumnSchema {
630 column_name: custom_ts
631 .map(|ts| ts.get_column_name().to_string())
632 .unwrap_or_else(|| greptime_timestamp().to_string()),
633 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
634 if pipeline_ctx.channel == Channel::Prometheus {
635 ColumnDataType::TimestampMillisecond
636 } else {
637 ColumnDataType::TimestampNanosecond
638 }
639 }) as i32,
640 semantic_type: SemanticType::Timestamp as i32,
641 datatype_extension: None,
642 options: None,
643 });
644
645 let mut opt_map = HashMap::new();
646 let len = pipeline_maps.len();
647
648 for mut pipeline_map in pipeline_maps {
649 let opt = unwrap_or_continue_if_err!(
650 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
651 skip_error
652 );
653 let row = unwrap_or_continue_if_err!(
654 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
655 skip_error
656 );
657
658 opt_map
659 .entry(opt)
660 .or_insert_with(|| Vec::with_capacity(len))
661 .push(row);
662 }
663
664 let column_count = schema_info.schema.len();
665 for (_, row) in opt_map.iter_mut() {
666 for row in row.iter_mut() {
667 let diff = column_count - row.values.len();
668 for _ in 0..diff {
669 row.values.push(GreptimeValue { value_data: None });
670 }
671 }
672 }
673
674 Ok((schema_info, opt_map))
675}
676
677pub fn identity_pipeline(
686 array: Vec<VrlValue>,
687 table: Option<Arc<table::Table>>,
688 pipeline_ctx: &PipelineContext<'_>,
689) -> Result<HashMap<ContextOpt, Rows>> {
690 let skip_error = pipeline_ctx.pipeline_param.skip_error();
691 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
692 let mut input = Vec::with_capacity(array.len());
694 for item in array.into_iter() {
695 let result =
696 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
697 input.push(result);
698 }
699
700 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
701 if let Some(table) = table {
702 let table_info = table.table_info();
703 for tag_name in table_info.meta.row_key_column_names() {
704 if let Some(index) = schema.index.get(tag_name) {
705 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
706 }
707 }
708 }
709
710 opt_map
711 .into_iter()
712 .map(|(opt, rows)| {
713 (
714 opt,
715 Rows {
716 schema: schema.schema.clone(),
717 rows,
718 },
719 )
720 })
721 .collect::<HashMap<ContextOpt, Rows>>()
722 })
723}
724
725pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
731 let mut flattened = BTreeMap::new();
732 let object = object.into_object().context(ValueMustBeMapSnafu)?;
733
734 if !object.is_empty() {
735 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
737 }
738
739 Ok(VrlValue::Object(flattened))
740}
741
742fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
743 match value {
744 VrlValue::Null => serde_json_crate::Value::Null,
745 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
746 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
747 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
748 .map(serde_json_crate::Value::Number)
749 .unwrap_or(serde_json_crate::Value::Null),
750 VrlValue::Bytes(bytes) => {
751 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
752 }
753 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
754 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
755 VrlValue::Array(arr) => {
756 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
757 }
758 VrlValue::Object(map) => serde_json_crate::Value::Object(
759 map.iter()
760 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
761 .collect(),
762 ),
763 }
764}
765
766fn do_flatten_object(
767 dest: &mut BTreeMap<KeyString, VrlValue>,
768 base: Option<&str>,
769 object: BTreeMap<KeyString, VrlValue>,
770 current_level: usize,
771 max_nested_levels: usize,
772) {
773 for (key, value) in object {
774 let new_key = base.map_or_else(
775 || key.clone(),
776 |base_key| format!("{base_key}.{key}").into(),
777 );
778
779 match value {
780 VrlValue::Object(object) => {
781 if current_level >= max_nested_levels {
782 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
784 &VrlValue::Object(object),
785 ))
786 .unwrap_or_else(|_| String::from("{}"));
787 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
788 } else {
789 do_flatten_object(
790 dest,
791 Some(&new_key),
792 object,
793 current_level + 1,
794 max_nested_levels,
795 );
796 }
797 }
798 VrlValue::Array(_) => {
800 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
801 .unwrap_or_else(|_| String::from("[]"));
802 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
803 }
804 _ => {
806 dest.insert(new_key, value);
807 }
808 }
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use api::v1::SemanticType;
815
816 use super::*;
817 use crate::{PipelineDefinition, identity_pipeline};
818
819 #[test]
820 fn test_identify_pipeline() {
821 let params = GreptimePipelineParams::default();
822 let pipeline_ctx = PipelineContext::new(
823 &PipelineDefinition::GreptimeIdentityPipeline(None),
824 ¶ms,
825 Channel::Unknown,
826 );
827 {
828 let array = [
829 serde_json::json!({
830 "woshinull": null,
831 "name": "Alice",
832 "age": 20,
833 "is_student": true,
834 "score": 99.5,
835 "hobbies": "reading",
836 "address": "Beijing",
837 }),
838 serde_json::json!({
839 "name": "Bob",
840 "age": 21,
841 "is_student": false,
842 "score": "88.5",
843 "hobbies": "swimming",
844 "address": "Shanghai",
845 "gaga": "gaga"
846 }),
847 ];
848 let array = array.iter().map(|v| v.into()).collect();
849 let rows = identity_pipeline(array, None, &pipeline_ctx);
850 assert!(rows.is_err());
851 assert_eq!(
852 rows.err().unwrap().to_string(),
853 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
854 );
855 }
856 {
857 let array = [
858 serde_json::json!({
859 "woshinull": null,
860 "name": "Alice",
861 "age": 20,
862 "is_student": true,
863 "score": 99.5,
864 "hobbies": "reading",
865 "address": "Beijing",
866 }),
867 serde_json::json!({
868 "name": "Bob",
869 "age": 21,
870 "is_student": false,
871 "score": 88,
872 "hobbies": "swimming",
873 "address": "Shanghai",
874 "gaga": "gaga"
875 }),
876 ];
877 let array = array.iter().map(|v| v.into()).collect();
878 let rows = identity_pipeline(array, None, &pipeline_ctx);
879 assert!(rows.is_err());
880 assert_eq!(
881 rows.err().unwrap().to_string(),
882 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
883 );
884 }
885 {
886 let array = [
887 serde_json::json!({
888 "woshinull": null,
889 "name": "Alice",
890 "age": 20,
891 "is_student": true,
892 "score": 99.5,
893 "hobbies": "reading",
894 "address": "Beijing",
895 }),
896 serde_json::json!({
897 "name": "Bob",
898 "age": 21,
899 "is_student": false,
900 "score": 88.5,
901 "hobbies": "swimming",
902 "address": "Shanghai",
903 "gaga": "gaga"
904 }),
905 ];
906 let array = array.iter().map(|v| v.into()).collect();
907 let rows = identity_pipeline(array, None, &pipeline_ctx);
908 assert!(rows.is_ok());
909 let mut rows = rows.unwrap();
910 assert!(rows.len() == 1);
911 let rows = rows.remove(&ContextOpt::default()).unwrap();
912 assert_eq!(rows.schema.len(), 8);
913 assert_eq!(rows.rows.len(), 2);
914 assert_eq!(8, rows.rows[0].values.len());
915 assert_eq!(8, rows.rows[1].values.len());
916 }
917 {
918 let array = [
919 serde_json::json!({
920 "woshinull": null,
921 "name": "Alice",
922 "age": 20,
923 "is_student": true,
924 "score": 99.5,
925 "hobbies": "reading",
926 "address": "Beijing",
927 }),
928 serde_json::json!({
929 "name": "Bob",
930 "age": 21,
931 "is_student": false,
932 "score": 88.5,
933 "hobbies": "swimming",
934 "address": "Shanghai",
935 "gaga": "gaga"
936 }),
937 ];
938 let tag_column_names = ["name".to_string(), "address".to_string()];
939
940 let rows =
941 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
942 .map(|(mut schema, mut rows)| {
943 for name in tag_column_names {
944 if let Some(index) = schema.index.get(&name) {
945 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
946 }
947 }
948
949 assert!(rows.len() == 1);
950 let rows = rows.remove(&ContextOpt::default()).unwrap();
951
952 Rows {
953 schema: schema.schema,
954 rows,
955 }
956 });
957
958 assert!(rows.is_ok());
959 let rows = rows.unwrap();
960 assert_eq!(rows.schema.len(), 8);
961 assert_eq!(rows.rows.len(), 2);
962 assert_eq!(8, rows.rows[0].values.len());
963 assert_eq!(8, rows.rows[1].values.len());
964 assert_eq!(
965 rows.schema
966 .iter()
967 .find(|x| x.column_name == "name")
968 .unwrap()
969 .semantic_type,
970 SemanticType::Tag as i32
971 );
972 assert_eq!(
973 rows.schema
974 .iter()
975 .find(|x| x.column_name == "address")
976 .unwrap()
977 .semantic_type,
978 SemanticType::Tag as i32
979 );
980 assert_eq!(
981 rows.schema
982 .iter()
983 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
984 .count(),
985 2
986 );
987 }
988 }
989
990 #[test]
991 fn test_flatten() {
992 let test_cases = vec![
993 (
995 serde_json::json!(
996 {
997 "a": {
998 "b": {
999 "c": [1, 2, 3]
1000 }
1001 },
1002 "d": [
1003 "foo",
1004 "bar"
1005 ],
1006 "e": {
1007 "f": [7, 8, 9],
1008 "g": {
1009 "h": 123,
1010 "i": "hello",
1011 "j": {
1012 "k": true
1013 }
1014 }
1015 }
1016 }
1017 ),
1018 10,
1019 Some(serde_json::json!(
1020 {
1021 "a.b.c": "[1,2,3]",
1022 "d": "[\"foo\",\"bar\"]",
1023 "e.f": "[7,8,9]",
1024 "e.g.h": 123,
1025 "e.g.i": "hello",
1026 "e.g.j.k": true
1027 }
1028 )),
1029 ),
1030 (
1032 serde_json::json!(
1033 {
1034 "a": {
1035 "b": {
1036 "c": {
1037 "d": [1, 2, 3]
1038 }
1039 }
1040 },
1041 "e": [
1042 "foo",
1043 "bar"
1044 ]
1045 }
1046 ),
1047 3,
1048 Some(serde_json::json!(
1049 {
1050 "a.b.c": "{\"d\":[1,2,3]}",
1051 "e": "[\"foo\",\"bar\"]"
1052 }
1053 )),
1054 ),
1055 ];
1056
1057 for (input, max_depth, expected) in test_cases {
1058 let input = input.into();
1059 let expected = expected.map(|e| e.into());
1060
1061 let flattened_object = flatten_object(input, max_depth).ok();
1062 assert_eq!(flattened_object, expected);
1063 }
1064 }
1065
1066 use ahash::HashMap as AHashMap;
1067 #[test]
1068 fn test_values_to_rows_skip_error_handling() {
1069 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1070
1071 {
1073 let schema_info = &mut SchemaInfo::default();
1074 let input_array = vec![
1075 serde_json::json!({"name": "Alice", "age": 25}).into(),
1077 VrlValue::Bytes("invalid_string".into()),
1079 serde_json::json!({"name": "Bob", "age": 30}).into(),
1081 VrlValue::Integer(42),
1083 serde_json::json!({"name": "Charlie", "age": 35}).into(),
1085 ];
1086
1087 let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1088 "skip_error".to_string(),
1089 "true".to_string(),
1090 )]));
1091
1092 let pipeline_ctx = PipelineContext::new(
1093 &PipelineDefinition::GreptimeIdentityPipeline(None),
1094 ¶ms,
1095 Channel::Unknown,
1096 );
1097
1098 let result = values_to_rows(
1099 schema_info,
1100 VrlValue::Array(input_array),
1101 &pipeline_ctx,
1102 None,
1103 true,
1104 table_suffix_template.as_ref(),
1105 );
1106
1107 assert!(result.is_ok());
1109 let rows_by_context = result.unwrap();
1110 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1112 assert_eq!(total_rows, 3); }
1114
1115 {
1117 let schema_info = &mut SchemaInfo::default();
1118 let input_array = vec![
1119 serde_json::json!({"name": "Alice", "age": 25}).into(),
1120 VrlValue::Bytes("invalid_string".into()), ];
1122
1123 let params = GreptimePipelineParams::default(); let pipeline_ctx = PipelineContext::new(
1126 &PipelineDefinition::GreptimeIdentityPipeline(None),
1127 ¶ms,
1128 Channel::Unknown,
1129 );
1130
1131 let result = values_to_rows(
1132 schema_info,
1133 VrlValue::Array(input_array),
1134 &pipeline_ctx,
1135 None,
1136 true,
1137 table_suffix_template.as_ref(),
1138 );
1139
1140 assert!(result.is_err());
1142 let error_msg = result.unwrap_err().to_string();
1143 assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1144 }
1145 }
1146
1147 #[test]
1149 fn test_values_to_rows_per_element_context_opt() {
1150 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1151 let schema_info = &mut SchemaInfo::default();
1152
1153 let input_array = vec![
1155 serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1156 serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1157 serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1158 ];
1159
1160 let params = GreptimePipelineParams::default();
1161 let pipeline_ctx = PipelineContext::new(
1162 &PipelineDefinition::GreptimeIdentityPipeline(None),
1163 ¶ms,
1164 Channel::Unknown,
1165 );
1166
1167 let result = values_to_rows(
1168 schema_info,
1169 VrlValue::Array(input_array),
1170 &pipeline_ctx,
1171 None,
1172 true,
1173 table_suffix_template.as_ref(),
1174 );
1175
1176 assert!(result.is_ok());
1177 let rows_by_context = result.unwrap();
1178
1179 assert_eq!(rows_by_context.len(), 2);
1181
1182 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1184 assert_eq!(total_rows, 3);
1185
1186 let mut ttl_1h_count = 0;
1188 let mut ttl_24h_count = 0;
1189 for rows in rows_by_context.values() {
1190 if rows.len() == 2 {
1192 ttl_1h_count = rows.len();
1193 } else if rows.len() == 1 {
1194 ttl_24h_count = rows.len();
1195 }
1196 }
1197 assert_eq!(ttl_1h_count, 2); assert_eq!(ttl_24h_count, 1); }
1200}