1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46 ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60pub type RowWithTableSuffix = (Row, Option<String>);
62
63#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67 transforms: Transforms,
68 schema: Vec<ColumnSchema>,
69}
70
71#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74 options: HashMap<String, String>,
77
78 pub skip_error: OnceCell<bool>,
80 pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86 pub fn from_params(params: Option<&str>) -> Self {
90 let options = Self::parse_header_str_to_map(params);
91
92 Self {
93 options,
94 skip_error: OnceCell::new(),
95 max_nested_levels: OnceCell::new(),
96 }
97 }
98
99 pub fn from_map(options: HashMap<String, String>) -> Self {
100 Self {
101 options,
102 skip_error: OnceCell::new(),
103 max_nested_levels: OnceCell::new(),
104 }
105 }
106
107 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108 if let Some(params) = params {
109 if params.is_empty() {
110 HashMap::new()
111 } else {
112 params
113 .split('&')
114 .filter_map(|s| s.split_once('='))
115 .map(|(k, v)| (k.to_string(), v.to_string()))
116 .collect::<HashMap<String, String>>()
117 }
118 } else {
119 HashMap::new()
120 }
121 }
122
123 pub fn skip_error(&self) -> bool {
125 *self
126 .skip_error
127 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128 }
129
130 pub fn max_nested_levels(&self) -> usize {
133 *self.max_nested_levels.get_or_init(|| {
134 self.options
135 .get("max_nested_levels")
136 .and_then(|s| s.parse::<usize>().ok())
137 .filter(|v| *v > 0)
138 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139 })
140 }
141}
142
143impl GreptimeTransformer {
144 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146 let type_ = ColumnDataType::TimestampNanosecond;
147 let default = None;
148
149 let transform = Transform {
150 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151 type_,
152 default,
153 index: Some(Index::Time),
154 on_failure: Some(crate::etl::transform::OnFailure::Default),
155 tag: false,
156 };
157 transforms.push(transform);
158 }
159
160 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
162 let mut schema = vec![];
163 for transform in transforms.iter() {
164 schema.extend(coerce_columns(transform)?);
165 }
166 Ok(schema)
167 }
168}
169
170impl GreptimeTransformer {
171 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
172 let mut column_names_set = HashSet::new();
174 let mut timestamp_columns = vec![];
175
176 for transform in transforms.iter() {
177 let target_fields_set = transform
178 .fields
179 .iter()
180 .map(|f| f.target_or_input_field())
181 .collect::<HashSet<_>>();
182
183 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
184 if !intersections.is_empty() {
185 let duplicates = intersections.iter().join(",");
186 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
187 }
188
189 column_names_set.extend(target_fields_set);
190
191 if let Some(idx) = transform.index
192 && idx == Index::Time
193 {
194 match transform.fields.len() {
195 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
197 _ => {
198 return TransformMultipleTimestampIndexSnafu {
199 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
200 }
201 .fail();
202 }
203 }
204 }
205 }
206
207 let schema = match timestamp_columns.len() {
208 0 if doc_version == &PipelineDocVersion::V1 => {
209 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
211 GreptimeTransformer::init_schemas(&transforms)?
212 }
213 1 => GreptimeTransformer::init_schemas(&transforms)?,
214 count => {
215 let columns = timestamp_columns.iter().join(", ");
216 return TransformTimestampIndexCountSnafu { count, columns }.fail();
217 }
218 };
219 Ok(GreptimeTransformer { transforms, schema })
220 }
221
222 pub fn transform_mut(
223 &self,
224 pipeline_map: &mut VrlValue,
225 is_v1: bool,
226 ) -> Result<Vec<GreptimeValue>> {
227 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
228 let mut output_index = 0;
229 for transform in self.transforms.iter() {
230 for field in transform.fields.iter() {
231 let column_name = field.input_field();
232
233 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
234 match pipeline_map.get(column_name) {
236 Some(v) => {
237 let value_data = coerce_value(v, transform)?;
238 values[output_index] = GreptimeValue { value_data };
240 }
241 None => {
242 let value_data = match transform.on_failure {
243 Some(crate::etl::transform::OnFailure::Default) => {
244 match transform.get_default() {
245 Some(default) => Some(default.clone()),
246 None => transform.get_default_value_when_data_is_none(),
247 }
248 }
249 Some(crate::etl::transform::OnFailure::Ignore) => None,
250 None => None,
251 };
252 if transform.is_timeindex() && value_data.is_none() {
253 return TimeIndexMustBeNonNullSnafu.fail();
254 }
255 values[output_index] = GreptimeValue { value_data };
256 }
257 }
258 output_index += 1;
259 if !is_v1 {
260 pipeline_map.remove(column_name);
263 }
264 }
265 }
266 Ok(values)
267 }
268
269 pub fn transforms(&self) -> &Transforms {
270 &self.transforms
271 }
272
273 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
274 &self.schema
275 }
276
277 pub fn transforms_mut(&mut self) -> &mut Transforms {
278 &mut self.transforms
279 }
280}
281
282#[derive(Clone)]
283pub struct ColumnMetadata {
284 column_schema: datatypes::schema::ColumnSchema,
285 semantic_type: SemanticType,
286}
287
288impl From<ColumnSchema> for ColumnMetadata {
289 fn from(value: ColumnSchema) -> Self {
290 let datatype = value.datatype();
291 let semantic_type = value.semantic_type();
292 let ColumnSchema {
293 column_name,
294 datatype: _,
295 semantic_type: _,
296 datatype_extension,
297 options,
298 } = value;
299
300 let column_schema = datatypes::schema::ColumnSchema::new(
301 column_name,
302 ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
303 semantic_type != SemanticType::Timestamp,
304 );
305
306 let metadata = collect_column_options(options.as_ref());
307 let column_schema = column_schema.with_metadata(metadata);
308
309 Self {
310 column_schema,
311 semantic_type,
312 }
313 }
314}
315
316impl TryFrom<ColumnMetadata> for ColumnSchema {
317 type Error = api::error::Error;
318
319 fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
320 let ColumnMetadata {
321 column_schema,
322 semantic_type,
323 } = value;
324
325 let options = options_from_column_schema(&column_schema);
326
327 let (datatype, datatype_extension) =
328 ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
329
330 Ok(ColumnSchema {
331 column_name: column_schema.name,
332 datatype: datatype as _,
333 semantic_type: semantic_type as _,
334 datatype_extension,
335 options,
336 })
337 }
338}
339
340#[derive(Default)]
344pub struct SchemaInfo {
345 pub schema: Vec<ColumnMetadata>,
347 pub index: HashMap<String, usize>,
349 table: Option<Arc<Table>>,
351}
352
353impl SchemaInfo {
354 pub fn with_capacity(capacity: usize) -> Self {
355 Self {
356 schema: Vec::with_capacity(capacity),
357 index: HashMap::with_capacity(capacity),
358 table: None,
359 }
360 }
361
362 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
363 let mut index = HashMap::new();
364 for (i, schema) in schema_list.iter().enumerate() {
365 index.insert(schema.column_name.clone(), i);
366 }
367 Self {
368 schema: schema_list.into_iter().map(Into::into).collect(),
369 index,
370 table: None,
371 }
372 }
373
374 pub fn set_table(&mut self, table: Option<Arc<Table>>) {
375 self.table = table;
376 }
377
378 fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
379 if let Some(table) = &self.table
380 && let Some(i) = table.schema_ref().column_index_by_name(column_name)
381 {
382 let column_schema = table.schema_ref().column_schemas()[i].clone();
383
384 let semantic_type = if column_schema.is_time_index() {
385 SemanticType::Timestamp
386 } else if table.table_info().meta.primary_key_indices.contains(&i) {
387 SemanticType::Tag
388 } else {
389 SemanticType::Field
390 };
391
392 Some(ColumnMetadata {
393 column_schema,
394 semantic_type,
395 })
396 } else {
397 None
398 }
399 }
400
401 pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
402 self.schema
403 .iter()
404 .map(|x| x.clone().try_into())
405 .collect::<api::error::Result<Vec<_>>>()
406 }
407}
408
409fn resolve_schema(
410 index: Option<usize>,
411 pipeline_context: &PipelineContext,
412 column: &str,
413 value_type: &ConcreteDataType,
414 schema_info: &mut SchemaInfo,
415) -> Result<()> {
416 if let Some(index) = index {
417 let column_type = &mut schema_info.schema[index].column_schema.data_type;
418 match (column_type, value_type) {
419 (column_type, value_type) if column_type == value_type => Ok(()),
420 (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
421 if column_type.is_include(value_type) =>
422 {
423 Ok(())
424 }
425 (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
426 column,
427 expected: column_type.to_string(),
428 actual: value_type.to_string(),
429 }
430 .fail(),
431 }
432 } else {
433 let column_schema = schema_info
434 .find_column_schema_in_table(column)
435 .unwrap_or_else(|| {
436 let semantic_type = decide_semantic(pipeline_context, column);
437 let column_schema = datatypes::schema::ColumnSchema::new(
438 column,
439 value_type.clone(),
440 semantic_type != SemanticType::Timestamp,
441 );
442 ColumnMetadata {
443 column_schema,
444 semantic_type,
445 }
446 });
447 let key = column.to_string();
448 schema_info.schema.push(column_schema);
449 schema_info.index.insert(key, schema_info.schema.len() - 1);
450 Ok(())
451 }
452}
453
454fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
455 match p_ctx.channel {
456 Channel::Prometheus => {
457 let ts = values
458 .as_object()
459 .and_then(|m| m.get(greptime_timestamp()))
460 .and_then(|ts| ts.try_into_i64().ok())
461 .unwrap_or_default();
462 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
463 }
464 _ => {
465 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
466 match custom_ts {
467 Some(ts) => {
468 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
469 Some(ts.get_timestamp_value(ts_field)).transpose()
470 }
471 None => Ok(Some(ValueData::TimestampNanosecondValue(
472 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
473 ))),
474 }
475 }
476 }
477}
478
479pub(crate) fn values_to_rows(
488 schema_info: &mut SchemaInfo,
489 mut values: VrlValue,
490 pipeline_ctx: &PipelineContext<'_>,
491 row: Option<Vec<GreptimeValue>>,
492 need_calc_ts: bool,
493 tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
494) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
495 let skip_error = pipeline_ctx.pipeline_param.skip_error();
496 let VrlValue::Array(arr) = values else {
497 let mut result = std::collections::HashMap::new();
499
500 let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
501 Ok(r) => r,
502 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
503 };
504
505 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
506 let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
507 Ok(r) => r,
508 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
509 };
510 result.insert(opt, vec![(row, table_suffix)]);
511 return Ok(result);
512 };
513
514 let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
515 std::collections::HashMap::new();
516 for (index, mut value) in arr.into_iter().enumerate() {
517 if !value.is_object() {
518 unwrap_or_continue_if_err!(
519 ArrayElementMustBeObjectSnafu {
520 index,
521 actual_type: value.kind_str().to_string(),
522 }
523 .fail(),
524 skip_error
525 );
526 }
527
528 let mut opt = unwrap_or_continue_if_err!(
530 ContextOpt::from_pipeline_map_to_opt(&mut value),
531 skip_error
532 );
533 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
534 let transformed_row = unwrap_or_continue_if_err!(
535 values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
536 skip_error
537 );
538 rows_by_context
539 .entry(opt)
540 .or_default()
541 .push((transformed_row, table_suffix));
542 }
543 Ok(rows_by_context)
544}
545
546pub(crate) fn values_to_row(
553 schema_info: &mut SchemaInfo,
554 values: VrlValue,
555 pipeline_ctx: &PipelineContext<'_>,
556 row: Option<Vec<GreptimeValue>>,
557 need_calc_ts: bool,
558) -> Result<Row> {
559 let mut row: Vec<GreptimeValue> =
560 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
561 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563 if need_calc_ts {
564 let ts = calc_ts(pipeline_ctx, &values)?;
566 row.push(GreptimeValue { value_data: ts });
567 }
568
569 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
570
571 let ts_column_name = custom_ts
573 .as_ref()
574 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
575
576 let values = values.into_object().context(ValueMustBeMapSnafu)?;
577
578 for (column_name, value) in values {
579 if need_calc_ts && column_name.as_str() == ts_column_name {
580 continue;
581 }
582
583 resolve_value(
584 value,
585 column_name.into(),
586 &mut row,
587 schema_info,
588 pipeline_ctx,
589 )?;
590 }
591 Ok(Row { values: row })
592}
593
594fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
595 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
596 SemanticType::Tag
597 } else {
598 SemanticType::Field
599 }
600}
601
602fn resolve_value(
603 value: VrlValue,
604 column_name: String,
605 row: &mut Vec<GreptimeValue>,
606 schema_info: &mut SchemaInfo,
607 p_ctx: &PipelineContext,
608) -> Result<()> {
609 let index = schema_info.index.get(&column_name).copied();
610
611 let value_data = match value {
612 VrlValue::Null => return Ok(()),
613
614 VrlValue::Integer(v) => {
615 resolve_schema(
617 index,
618 p_ctx,
619 &column_name,
620 &ConcreteDataType::int64_datatype(),
621 schema_info,
622 )?;
623 Some(ValueData::I64Value(v))
624 }
625
626 VrlValue::Float(v) => {
627 resolve_schema(
629 index,
630 p_ctx,
631 &column_name,
632 &ConcreteDataType::float64_datatype(),
633 schema_info,
634 )?;
635 Some(ValueData::F64Value(v.into()))
636 }
637
638 VrlValue::Boolean(v) => {
639 resolve_schema(
640 index,
641 p_ctx,
642 &column_name,
643 &ConcreteDataType::boolean_datatype(),
644 schema_info,
645 )?;
646 Some(ValueData::BoolValue(v))
647 }
648
649 VrlValue::Bytes(v) => {
650 resolve_schema(
651 index,
652 p_ctx,
653 &column_name,
654 &ConcreteDataType::string_datatype(),
655 schema_info,
656 )?;
657 Some(ValueData::StringValue(String::from_utf8_lossy_owned(
658 v.to_vec(),
659 )))
660 }
661
662 VrlValue::Regex(v) => {
663 warn!(
664 "Persisting regex value in the table, this should not happen, column_name: {}",
665 column_name
666 );
667 resolve_schema(
668 index,
669 p_ctx,
670 &column_name,
671 &ConcreteDataType::string_datatype(),
672 schema_info,
673 )?;
674 Some(ValueData::StringValue(v.to_string()))
675 }
676
677 VrlValue::Timestamp(ts) => {
678 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
679 input: ts.to_rfc3339(),
680 })?;
681 resolve_schema(
682 index,
683 p_ctx,
684 &column_name,
685 &ConcreteDataType::timestamp_nanosecond_datatype(),
686 schema_info,
687 )?;
688 Some(ValueData::TimestampNanosecondValue(ns))
689 }
690
691 VrlValue::Array(_) | VrlValue::Object(_) => {
692 let is_json_native_type = schema_info
693 .find_column_schema_in_table(&column_name)
694 .is_some_and(|x| {
695 if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
696 column_type.is_native_type()
697 } else {
698 false
699 }
700 });
701
702 let value = if is_json_native_type {
703 let json_extension_type: Option<JsonExtensionType> =
704 if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705 x.column_schema.extension_type()?
706 } else {
707 None
708 };
709 let settings = json_extension_type
710 .and_then(|x| x.metadata().json_structure_settings.clone())
711 .unwrap_or_default();
712 let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713 CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714 })?;
715 let value = settings.encode(value)?;
716
717 resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719 let Value::Json(value) = value else {
720 unreachable!()
721 };
722 ValueData::JsonValue(encode_json_value(*value))
723 } else {
724 resolve_schema(
725 index,
726 p_ctx,
727 &column_name,
728 &ConcreteDataType::binary_datatype(),
729 schema_info,
730 )?;
731
732 let value = vrl_value_to_jsonb_value(&value);
733 ValueData::BinaryValue(value.to_vec())
734 };
735 Some(value)
736 }
737 };
738
739 let value = GreptimeValue { value_data };
740 if let Some(index) = index {
741 row[index] = value;
742 } else {
743 row.push(value);
744 }
745 Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749 match value {
750 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756 VrlValue::Object(btree_map) => jsonb::Value::Object(
757 btree_map
758 .iter()
759 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760 .collect(),
761 ),
762 VrlValue::Array(values) => jsonb::Value::Array(
763 values
764 .iter()
765 .map(|value| vrl_value_to_jsonb_value(value))
766 .collect(),
767 ),
768 VrlValue::Null => jsonb::Value::Null,
769 }
770}
771
772fn identity_pipeline_inner(
773 pipeline_maps: Vec<VrlValue>,
774 pipeline_ctx: &PipelineContext<'_>,
775) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
776 let skip_error = pipeline_ctx.pipeline_param.skip_error();
777 let mut schema_info = SchemaInfo::default();
778 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
779
780 let column_schema = datatypes::schema::ColumnSchema::new(
782 custom_ts
783 .map(|ts| ts.get_column_name().to_string())
784 .unwrap_or_else(|| greptime_timestamp().to_string()),
785 custom_ts
786 .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
787 .unwrap_or_else(|| {
788 if pipeline_ctx.channel == Channel::Prometheus {
789 ConcreteDataType::timestamp_millisecond_datatype()
790 } else {
791 ConcreteDataType::timestamp_nanosecond_datatype()
792 }
793 }),
794 false,
795 );
796 schema_info.schema.push(ColumnMetadata {
797 column_schema,
798 semantic_type: SemanticType::Timestamp,
799 });
800
801 let mut opt_map = HashMap::new();
802 let len = pipeline_maps.len();
803
804 for mut pipeline_map in pipeline_maps {
805 let opt = unwrap_or_continue_if_err!(
806 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
807 skip_error
808 );
809 let row = unwrap_or_continue_if_err!(
810 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
811 skip_error
812 );
813
814 opt_map
815 .entry(opt)
816 .or_insert_with(|| Vec::with_capacity(len))
817 .push(row);
818 }
819
820 let column_count = schema_info.schema.len();
821 for (_, row) in opt_map.iter_mut() {
822 for row in row.iter_mut() {
823 assert!(
824 column_count >= row.values.len(),
825 "column_count: {}, row.values.len(): {}",
826 column_count,
827 row.values.len()
828 );
829 let diff = column_count - row.values.len();
830 for _ in 0..diff {
831 row.values.push(GreptimeValue { value_data: None });
832 }
833 }
834 }
835
836 Ok((schema_info, opt_map))
837}
838
839pub fn identity_pipeline(
848 array: Vec<VrlValue>,
849 table: Option<Arc<table::Table>>,
850 pipeline_ctx: &PipelineContext<'_>,
851) -> Result<HashMap<ContextOpt, Rows>> {
852 let skip_error = pipeline_ctx.pipeline_param.skip_error();
853 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
854 let mut input = Vec::with_capacity(array.len());
856 for item in array.into_iter() {
857 let result =
858 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
859 input.push(result);
860 }
861
862 identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
863 if let Some(table) = table {
864 let table_info = table.table_info();
865 for tag_name in table_info.meta.row_key_column_names() {
866 if let Some(index) = schema.index.get(tag_name) {
867 schema.schema[*index].semantic_type = SemanticType::Tag;
868 }
869 }
870 }
871
872 let column_schemas = schema.column_schemas()?;
873 Ok(opt_map
874 .into_iter()
875 .map(|(opt, rows)| {
876 (
877 opt,
878 Rows {
879 schema: column_schemas.clone(),
880 rows,
881 },
882 )
883 })
884 .collect::<HashMap<ContextOpt, Rows>>())
885 })
886}
887
888pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
894 let mut flattened = BTreeMap::new();
895 let object = object.into_object().context(ValueMustBeMapSnafu)?;
896
897 if !object.is_empty() {
898 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
900 }
901
902 Ok(VrlValue::Object(flattened))
903}
904
905fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
906 match value {
907 VrlValue::Null => serde_json_crate::Value::Null,
908 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
909 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
910 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
911 .map(serde_json_crate::Value::Number)
912 .unwrap_or(serde_json_crate::Value::Null),
913 VrlValue::Bytes(bytes) => {
914 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
915 }
916 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
917 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
918 VrlValue::Array(arr) => {
919 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
920 }
921 VrlValue::Object(map) => serde_json_crate::Value::Object(
922 map.iter()
923 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
924 .collect(),
925 ),
926 }
927}
928
929fn do_flatten_object(
930 dest: &mut BTreeMap<KeyString, VrlValue>,
931 base: Option<&str>,
932 object: BTreeMap<KeyString, VrlValue>,
933 current_level: usize,
934 max_nested_levels: usize,
935) {
936 for (key, value) in object {
937 let new_key = base.map_or_else(
938 || key.clone(),
939 |base_key| format!("{base_key}.{key}").into(),
940 );
941
942 match value {
943 VrlValue::Object(object) => {
944 if current_level >= max_nested_levels {
945 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
947 &VrlValue::Object(object),
948 ))
949 .unwrap_or_else(|_| String::from("{}"));
950 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
951 } else {
952 do_flatten_object(
953 dest,
954 Some(&new_key),
955 object,
956 current_level + 1,
957 max_nested_levels,
958 );
959 }
960 }
961 VrlValue::Array(_) => {
963 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
964 .unwrap_or_else(|_| String::from("[]"));
965 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
966 }
967 _ => {
969 dest.insert(new_key, value);
970 }
971 }
972 }
973}
974
975#[cfg(test)]
976mod tests {
977 use api::v1::SemanticType;
978
979 use super::*;
980 use crate::{PipelineDefinition, identity_pipeline};
981
982 #[test]
983 fn test_identify_pipeline() {
984 let params = GreptimePipelineParams::default();
985 let pipeline_ctx = PipelineContext::new(
986 &PipelineDefinition::GreptimeIdentityPipeline(None),
987 ¶ms,
988 Channel::Unknown,
989 );
990 {
991 let array = [
992 serde_json::json!({
993 "woshinull": null,
994 "name": "Alice",
995 "age": 20,
996 "is_student": true,
997 "score": 99.5,
998 "hobbies": "reading",
999 "address": "Beijing",
1000 }),
1001 serde_json::json!({
1002 "name": "Bob",
1003 "age": 21,
1004 "is_student": false,
1005 "score": "88.5",
1006 "hobbies": "swimming",
1007 "address": "Shanghai",
1008 "gaga": "gaga"
1009 }),
1010 ];
1011 let array = array.iter().map(|v| v.into()).collect();
1012 let rows = identity_pipeline(array, None, &pipeline_ctx);
1013 assert!(rows.is_err());
1014 assert_eq!(
1015 rows.err().unwrap().to_string(),
1016 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1017 );
1018 }
1019 {
1020 let array = [
1021 serde_json::json!({
1022 "woshinull": null,
1023 "name": "Alice",
1024 "age": 20,
1025 "is_student": true,
1026 "score": 99.5,
1027 "hobbies": "reading",
1028 "address": "Beijing",
1029 }),
1030 serde_json::json!({
1031 "name": "Bob",
1032 "age": 21,
1033 "is_student": false,
1034 "score": 88,
1035 "hobbies": "swimming",
1036 "address": "Shanghai",
1037 "gaga": "gaga"
1038 }),
1039 ];
1040 let array = array.iter().map(|v| v.into()).collect();
1041 let rows = identity_pipeline(array, None, &pipeline_ctx);
1042 assert!(rows.is_err());
1043 assert_eq!(
1044 rows.err().unwrap().to_string(),
1045 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1046 );
1047 }
1048 {
1049 let array = [
1050 serde_json::json!({
1051 "woshinull": null,
1052 "name": "Alice",
1053 "age": 20,
1054 "is_student": true,
1055 "score": 99.5,
1056 "hobbies": "reading",
1057 "address": "Beijing",
1058 }),
1059 serde_json::json!({
1060 "name": "Bob",
1061 "age": 21,
1062 "is_student": false,
1063 "score": 88.5,
1064 "hobbies": "swimming",
1065 "address": "Shanghai",
1066 "gaga": "gaga"
1067 }),
1068 ];
1069 let array = array.iter().map(|v| v.into()).collect();
1070 let rows = identity_pipeline(array, None, &pipeline_ctx);
1071 assert!(rows.is_ok());
1072 let mut rows = rows.unwrap();
1073 assert!(rows.len() == 1);
1074 let rows = rows.remove(&ContextOpt::default()).unwrap();
1075 assert_eq!(rows.schema.len(), 8);
1076 assert_eq!(rows.rows.len(), 2);
1077 assert_eq!(8, rows.rows[0].values.len());
1078 assert_eq!(8, rows.rows[1].values.len());
1079 }
1080 {
1081 let array = [
1082 serde_json::json!({
1083 "woshinull": null,
1084 "name": "Alice",
1085 "age": 20,
1086 "is_student": true,
1087 "score": 99.5,
1088 "hobbies": "reading",
1089 "address": "Beijing",
1090 }),
1091 serde_json::json!({
1092 "name": "Bob",
1093 "age": 21,
1094 "is_student": false,
1095 "score": 88.5,
1096 "hobbies": "swimming",
1097 "address": "Shanghai",
1098 "gaga": "gaga"
1099 }),
1100 ];
1101 let tag_column_names = ["name".to_string(), "address".to_string()];
1102
1103 let rows =
1104 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
1105 .map(|(mut schema, mut rows)| {
1106 for name in tag_column_names {
1107 if let Some(index) = schema.index.get(&name) {
1108 schema.schema[*index].semantic_type = SemanticType::Tag;
1109 }
1110 }
1111
1112 assert!(rows.len() == 1);
1113 let rows = rows.remove(&ContextOpt::default()).unwrap();
1114
1115 Rows {
1116 schema: schema.column_schemas().unwrap(),
1117 rows,
1118 }
1119 });
1120
1121 assert!(rows.is_ok());
1122 let rows = rows.unwrap();
1123 assert_eq!(rows.schema.len(), 8);
1124 assert_eq!(rows.rows.len(), 2);
1125 assert_eq!(8, rows.rows[0].values.len());
1126 assert_eq!(8, rows.rows[1].values.len());
1127 assert_eq!(
1128 rows.schema
1129 .iter()
1130 .find(|x| x.column_name == "name")
1131 .unwrap()
1132 .semantic_type,
1133 SemanticType::Tag as i32
1134 );
1135 assert_eq!(
1136 rows.schema
1137 .iter()
1138 .find(|x| x.column_name == "address")
1139 .unwrap()
1140 .semantic_type,
1141 SemanticType::Tag as i32
1142 );
1143 assert_eq!(
1144 rows.schema
1145 .iter()
1146 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1147 .count(),
1148 2
1149 );
1150 }
1151 }
1152
1153 #[test]
1154 fn test_flatten() {
1155 let test_cases = vec![
1156 (
1158 serde_json::json!(
1159 {
1160 "a": {
1161 "b": {
1162 "c": [1, 2, 3]
1163 }
1164 },
1165 "d": [
1166 "foo",
1167 "bar"
1168 ],
1169 "e": {
1170 "f": [7, 8, 9],
1171 "g": {
1172 "h": 123,
1173 "i": "hello",
1174 "j": {
1175 "k": true
1176 }
1177 }
1178 }
1179 }
1180 ),
1181 10,
1182 Some(serde_json::json!(
1183 {
1184 "a.b.c": "[1,2,3]",
1185 "d": "[\"foo\",\"bar\"]",
1186 "e.f": "[7,8,9]",
1187 "e.g.h": 123,
1188 "e.g.i": "hello",
1189 "e.g.j.k": true
1190 }
1191 )),
1192 ),
1193 (
1195 serde_json::json!(
1196 {
1197 "a": {
1198 "b": {
1199 "c": {
1200 "d": [1, 2, 3]
1201 }
1202 }
1203 },
1204 "e": [
1205 "foo",
1206 "bar"
1207 ]
1208 }
1209 ),
1210 3,
1211 Some(serde_json::json!(
1212 {
1213 "a.b.c": "{\"d\":[1,2,3]}",
1214 "e": "[\"foo\",\"bar\"]"
1215 }
1216 )),
1217 ),
1218 ];
1219
1220 for (input, max_depth, expected) in test_cases {
1221 let input = input.into();
1222 let expected = expected.map(|e| e.into());
1223
1224 let flattened_object = flatten_object(input, max_depth).ok();
1225 assert_eq!(flattened_object, expected);
1226 }
1227 }
1228
1229 use ahash::HashMap as AHashMap;
1230 #[test]
1231 fn test_values_to_rows_skip_error_handling() {
1232 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1233
1234 {
1236 let schema_info = &mut SchemaInfo::default();
1237 let input_array = vec![
1238 serde_json::json!({"name": "Alice", "age": 25}).into(),
1240 VrlValue::Bytes("invalid_string".into()),
1242 serde_json::json!({"name": "Bob", "age": 30}).into(),
1244 VrlValue::Integer(42),
1246 serde_json::json!({"name": "Charlie", "age": 35}).into(),
1248 ];
1249
1250 let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1251 "skip_error".to_string(),
1252 "true".to_string(),
1253 )]));
1254
1255 let pipeline_ctx = PipelineContext::new(
1256 &PipelineDefinition::GreptimeIdentityPipeline(None),
1257 ¶ms,
1258 Channel::Unknown,
1259 );
1260
1261 let result = values_to_rows(
1262 schema_info,
1263 VrlValue::Array(input_array),
1264 &pipeline_ctx,
1265 None,
1266 true,
1267 table_suffix_template.as_ref(),
1268 );
1269
1270 assert!(result.is_ok());
1272 let rows_by_context = result.unwrap();
1273 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1275 assert_eq!(total_rows, 3); }
1277
1278 {
1280 let schema_info = &mut SchemaInfo::default();
1281 let input_array = vec![
1282 serde_json::json!({"name": "Alice", "age": 25}).into(),
1283 VrlValue::Bytes("invalid_string".into()), ];
1285
1286 let params = GreptimePipelineParams::default(); let pipeline_ctx = PipelineContext::new(
1289 &PipelineDefinition::GreptimeIdentityPipeline(None),
1290 ¶ms,
1291 Channel::Unknown,
1292 );
1293
1294 let result = values_to_rows(
1295 schema_info,
1296 VrlValue::Array(input_array),
1297 &pipeline_ctx,
1298 None,
1299 true,
1300 table_suffix_template.as_ref(),
1301 );
1302
1303 assert!(result.is_err());
1305 let error_msg = result.unwrap_err().to_string();
1306 assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1307 }
1308 }
1309
1310 #[test]
1312 fn test_values_to_rows_per_element_context_opt() {
1313 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1314 let schema_info = &mut SchemaInfo::default();
1315
1316 let input_array = vec![
1318 serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1319 serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1320 serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1321 ];
1322
1323 let params = GreptimePipelineParams::default();
1324 let pipeline_ctx = PipelineContext::new(
1325 &PipelineDefinition::GreptimeIdentityPipeline(None),
1326 ¶ms,
1327 Channel::Unknown,
1328 );
1329
1330 let result = values_to_rows(
1331 schema_info,
1332 VrlValue::Array(input_array),
1333 &pipeline_ctx,
1334 None,
1335 true,
1336 table_suffix_template.as_ref(),
1337 );
1338
1339 assert!(result.is_ok());
1340 let rows_by_context = result.unwrap();
1341
1342 assert_eq!(rows_by_context.len(), 2);
1344
1345 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1347 assert_eq!(total_rows, 3);
1348
1349 let mut ttl_1h_count = 0;
1351 let mut ttl_24h_count = 0;
1352 for rows in rows_by_context.values() {
1353 if rows.len() == 2 {
1355 ttl_1h_count = rows.len();
1356 } else if rows.len() == 1 {
1357 ttl_24h_count = rows.len();
1358 }
1359 }
1360 assert_eq!(ttl_1h_count, 2); assert_eq!(ttl_24h_count, 1); }
1363}