1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46 ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60pub type RowWithTableSuffix = (Row, Option<String>);
62
63#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67 transforms: Transforms,
68 schema: Vec<ColumnSchema>,
69}
70
71#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74 options: HashMap<String, String>,
77
78 pub skip_error: OnceCell<bool>,
80 pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86 pub fn from_params(params: Option<&str>) -> Self {
90 let options = Self::parse_header_str_to_map(params);
91
92 Self {
93 options,
94 skip_error: OnceCell::new(),
95 max_nested_levels: OnceCell::new(),
96 }
97 }
98
99 pub fn from_map(options: HashMap<String, String>) -> Self {
100 Self {
101 options,
102 skip_error: OnceCell::new(),
103 max_nested_levels: OnceCell::new(),
104 }
105 }
106
107 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108 if let Some(params) = params {
109 if params.is_empty() {
110 HashMap::new()
111 } else {
112 params
113 .split('&')
114 .filter_map(|s| s.split_once('='))
115 .map(|(k, v)| (k.to_string(), v.to_string()))
116 .collect::<HashMap<String, String>>()
117 }
118 } else {
119 HashMap::new()
120 }
121 }
122
123 pub fn skip_error(&self) -> bool {
125 *self
126 .skip_error
127 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128 }
129
130 pub fn max_nested_levels(&self) -> usize {
133 *self.max_nested_levels.get_or_init(|| {
134 self.options
135 .get("max_nested_levels")
136 .and_then(|s| s.parse::<usize>().ok())
137 .filter(|v| *v > 0)
138 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139 })
140 }
141}
142
143impl GreptimeTransformer {
144 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146 let type_ = ColumnDataType::TimestampNanosecond;
147 let default = None;
148
149 let transform = Transform {
150 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151 type_,
152 default,
153 index: Some(Index::Time),
154 on_failure: Some(crate::etl::transform::OnFailure::Default),
155 tag: false,
156 };
157 transforms.push(transform);
158 }
159
160 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
162 let mut schema = vec![];
163 for transform in transforms.iter() {
164 schema.extend(coerce_columns(transform)?);
165 }
166 Ok(schema)
167 }
168}
169
170impl GreptimeTransformer {
171 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
172 let mut column_names_set = HashSet::new();
174 let mut timestamp_columns = vec![];
175
176 for transform in transforms.iter() {
177 let target_fields_set = transform
178 .fields
179 .iter()
180 .map(|f| f.target_or_input_field())
181 .collect::<HashSet<_>>();
182
183 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
184 if !intersections.is_empty() {
185 let duplicates = intersections.iter().join(",");
186 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
187 }
188
189 column_names_set.extend(target_fields_set);
190
191 if let Some(idx) = transform.index
192 && idx == Index::Time
193 {
194 match transform.fields.len() {
195 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
197 _ => {
198 return TransformMultipleTimestampIndexSnafu {
199 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
200 }
201 .fail();
202 }
203 }
204 }
205 }
206
207 let schema = match timestamp_columns.len() {
208 0 if doc_version == &PipelineDocVersion::V1 => {
209 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
211 GreptimeTransformer::init_schemas(&transforms)?
212 }
213 1 => GreptimeTransformer::init_schemas(&transforms)?,
214 count => {
215 let columns = timestamp_columns.iter().join(", ");
216 return TransformTimestampIndexCountSnafu { count, columns }.fail();
217 }
218 };
219 Ok(GreptimeTransformer { transforms, schema })
220 }
221
222 pub fn transform_mut(
223 &self,
224 pipeline_map: &mut VrlValue,
225 is_v1: bool,
226 ) -> Result<Vec<GreptimeValue>> {
227 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
228 let mut output_index = 0;
229 for transform in self.transforms.iter() {
230 for field in transform.fields.iter() {
231 let column_name = field.input_field();
232
233 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
234 match pipeline_map.get(column_name) {
236 Some(v) => {
237 let value_data = coerce_value(v, transform)?;
238 values[output_index] = GreptimeValue { value_data };
240 }
241 None => {
242 let value_data = match transform.on_failure {
243 Some(crate::etl::transform::OnFailure::Default) => {
244 match transform.get_default() {
245 Some(default) => Some(default.clone()),
246 None => transform.get_default_value_when_data_is_none(),
247 }
248 }
249 Some(crate::etl::transform::OnFailure::Ignore) => None,
250 None => None,
251 };
252 if transform.is_timeindex() && value_data.is_none() {
253 return TimeIndexMustBeNonNullSnafu.fail();
254 }
255 values[output_index] = GreptimeValue { value_data };
256 }
257 }
258 output_index += 1;
259 if !is_v1 {
260 pipeline_map.remove(column_name);
263 }
264 }
265 }
266 Ok(values)
267 }
268
269 pub fn transforms(&self) -> &Transforms {
270 &self.transforms
271 }
272
273 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
274 &self.schema
275 }
276
277 pub fn transforms_mut(&mut self) -> &mut Transforms {
278 &mut self.transforms
279 }
280}
281
282#[derive(Clone)]
283pub struct ColumnMetadata {
284 column_schema: datatypes::schema::ColumnSchema,
285 semantic_type: SemanticType,
286}
287
288impl From<ColumnSchema> for ColumnMetadata {
289 fn from(value: ColumnSchema) -> Self {
290 let datatype = value.datatype();
291 let semantic_type = value.semantic_type();
292 let ColumnSchema {
293 column_name,
294 datatype: _,
295 semantic_type: _,
296 datatype_extension,
297 options,
298 } = value;
299
300 let column_schema = datatypes::schema::ColumnSchema::new(
301 column_name,
302 ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
303 semantic_type != SemanticType::Timestamp,
304 );
305
306 let metadata = collect_column_options(options.as_ref());
307 let column_schema = column_schema.with_metadata(metadata);
308
309 Self {
310 column_schema,
311 semantic_type,
312 }
313 }
314}
315
316impl TryFrom<ColumnMetadata> for ColumnSchema {
317 type Error = api::error::Error;
318
319 fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
320 let ColumnMetadata {
321 column_schema,
322 semantic_type,
323 } = value;
324
325 let options = options_from_column_schema(&column_schema);
326
327 let (datatype, datatype_extension) =
328 ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
329
330 Ok(ColumnSchema {
331 column_name: column_schema.name,
332 datatype: datatype as _,
333 semantic_type: semantic_type as _,
334 datatype_extension,
335 options,
336 })
337 }
338}
339
340#[derive(Default)]
344pub struct SchemaInfo {
345 pub schema: Vec<ColumnMetadata>,
347 pub index: HashMap<String, usize>,
349 table: Option<Arc<Table>>,
351}
352
353impl SchemaInfo {
354 pub fn with_capacity(capacity: usize) -> Self {
355 Self {
356 schema: Vec::with_capacity(capacity),
357 index: HashMap::with_capacity(capacity),
358 table: None,
359 }
360 }
361
362 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
363 let mut index = HashMap::new();
364 for (i, schema) in schema_list.iter().enumerate() {
365 index.insert(schema.column_name.clone(), i);
366 }
367 Self {
368 schema: schema_list.into_iter().map(Into::into).collect(),
369 index,
370 table: None,
371 }
372 }
373
374 pub fn set_table(&mut self, table: Option<Arc<Table>>) {
375 self.table = table;
376 }
377
378 fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
379 if let Some(table) = &self.table
380 && let Some(i) = table.schema_ref().column_index_by_name(column_name)
381 {
382 let column_schema = table.schema_ref().column_schemas()[i].clone();
383
384 let semantic_type = if column_schema.is_time_index() {
385 SemanticType::Timestamp
386 } else if table.table_info().meta.primary_key_indices.contains(&i) {
387 SemanticType::Tag
388 } else {
389 SemanticType::Field
390 };
391
392 Some(ColumnMetadata {
393 column_schema,
394 semantic_type,
395 })
396 } else {
397 None
398 }
399 }
400
401 pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
402 self.schema
403 .iter()
404 .map(|x| x.clone().try_into())
405 .collect::<api::error::Result<Vec<_>>>()
406 }
407}
408
409fn resolve_schema(
410 index: Option<usize>,
411 pipeline_context: &PipelineContext,
412 column: &str,
413 value_type: &ConcreteDataType,
414 schema_info: &mut SchemaInfo,
415) -> Result<()> {
416 if let Some(index) = index {
417 let column_type = &mut schema_info.schema[index].column_schema.data_type;
418 match (column_type, value_type) {
419 (column_type, value_type) if column_type == value_type => Ok(()),
420 (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
421 if column_type.is_include(value_type) =>
422 {
423 Ok(())
424 }
425 (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
426 column,
427 expected: column_type.to_string(),
428 actual: value_type.to_string(),
429 }
430 .fail(),
431 }
432 } else {
433 let column_schema = schema_info
434 .find_column_schema_in_table(column)
435 .unwrap_or_else(|| {
436 let semantic_type = decide_semantic(pipeline_context, column);
437 let column_schema = datatypes::schema::ColumnSchema::new(
438 column,
439 value_type.clone(),
440 semantic_type != SemanticType::Timestamp,
441 );
442 ColumnMetadata {
443 column_schema,
444 semantic_type,
445 }
446 });
447 let key = column.to_string();
448 schema_info.schema.push(column_schema);
449 schema_info.index.insert(key, schema_info.schema.len() - 1);
450 Ok(())
451 }
452}
453
454fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
455 match p_ctx.channel {
456 Channel::Prometheus => {
457 let ts = values
458 .as_object()
459 .and_then(|m| m.get(greptime_timestamp()))
460 .and_then(|ts| ts.try_into_i64().ok())
461 .unwrap_or_default();
462 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
463 }
464 _ => {
465 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
466 match custom_ts {
467 Some(ts) => {
468 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
469 Some(ts.get_timestamp_value(ts_field)).transpose()
470 }
471 None => Ok(Some(ValueData::TimestampNanosecondValue(
472 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
473 ))),
474 }
475 }
476 }
477}
478
479pub(crate) fn values_to_rows(
488 schema_info: &mut SchemaInfo,
489 mut values: VrlValue,
490 pipeline_ctx: &PipelineContext<'_>,
491 row: Option<Vec<GreptimeValue>>,
492 need_calc_ts: bool,
493 tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
494) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
495 let skip_error = pipeline_ctx.pipeline_param.skip_error();
496 let VrlValue::Array(arr) = values else {
497 let mut result = std::collections::HashMap::new();
499
500 let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
501 Ok(r) => r,
502 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
503 };
504
505 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
506 let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
507 Ok(r) => r,
508 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
509 };
510 result.insert(opt, vec![(row, table_suffix)]);
511 return Ok(result);
512 };
513
514 let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
515 std::collections::HashMap::new();
516 for (index, mut value) in arr.into_iter().enumerate() {
517 if !value.is_object() {
518 unwrap_or_continue_if_err!(
519 ArrayElementMustBeObjectSnafu {
520 index,
521 actual_type: value.kind_str().to_string(),
522 }
523 .fail(),
524 skip_error
525 );
526 }
527
528 let mut opt = unwrap_or_continue_if_err!(
530 ContextOpt::from_pipeline_map_to_opt(&mut value),
531 skip_error
532 );
533 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
534 let transformed_row = unwrap_or_continue_if_err!(
535 values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
536 skip_error
537 );
538 rows_by_context
539 .entry(opt)
540 .or_default()
541 .push((transformed_row, table_suffix));
542 }
543 Ok(rows_by_context)
544}
545
546pub(crate) fn values_to_row(
553 schema_info: &mut SchemaInfo,
554 values: VrlValue,
555 pipeline_ctx: &PipelineContext<'_>,
556 row: Option<Vec<GreptimeValue>>,
557 need_calc_ts: bool,
558) -> Result<Row> {
559 let mut row: Vec<GreptimeValue> =
560 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
561 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563 if need_calc_ts {
564 let ts = calc_ts(pipeline_ctx, &values)?;
566 row.push(GreptimeValue { value_data: ts });
567 }
568
569 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
570
571 let ts_column_name = custom_ts
573 .as_ref()
574 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
575
576 let values = values.into_object().context(ValueMustBeMapSnafu)?;
577
578 for (column_name, value) in values {
579 if need_calc_ts && column_name.as_str() == ts_column_name {
580 continue;
581 }
582
583 resolve_value(
584 value,
585 column_name.into(),
586 &mut row,
587 schema_info,
588 pipeline_ctx,
589 )?;
590 }
591 Ok(Row { values: row })
592}
593
594fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
595 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
596 SemanticType::Tag
597 } else {
598 SemanticType::Field
599 }
600}
601
602fn resolve_value(
603 value: VrlValue,
604 column_name: String,
605 row: &mut Vec<GreptimeValue>,
606 schema_info: &mut SchemaInfo,
607 p_ctx: &PipelineContext,
608) -> Result<()> {
609 let index = schema_info.index.get(&column_name).copied();
610
611 let value_data = match value {
612 VrlValue::Null => None,
613
614 VrlValue::Integer(v) => {
615 resolve_schema(
617 index,
618 p_ctx,
619 &column_name,
620 &ConcreteDataType::int64_datatype(),
621 schema_info,
622 )?;
623 Some(ValueData::I64Value(v))
624 }
625
626 VrlValue::Float(v) => {
627 resolve_schema(
629 index,
630 p_ctx,
631 &column_name,
632 &ConcreteDataType::float64_datatype(),
633 schema_info,
634 )?;
635 Some(ValueData::F64Value(v.into()))
636 }
637
638 VrlValue::Boolean(v) => {
639 resolve_schema(
640 index,
641 p_ctx,
642 &column_name,
643 &ConcreteDataType::boolean_datatype(),
644 schema_info,
645 )?;
646 Some(ValueData::BoolValue(v))
647 }
648
649 VrlValue::Bytes(v) => {
650 resolve_schema(
651 index,
652 p_ctx,
653 &column_name,
654 &ConcreteDataType::string_datatype(),
655 schema_info,
656 )?;
657 Some(ValueData::StringValue(String::from_utf8_lossy_owned(
658 v.to_vec(),
659 )))
660 }
661
662 VrlValue::Regex(v) => {
663 warn!(
664 "Persisting regex value in the table, this should not happen, column_name: {}",
665 column_name
666 );
667 resolve_schema(
668 index,
669 p_ctx,
670 &column_name,
671 &ConcreteDataType::string_datatype(),
672 schema_info,
673 )?;
674 Some(ValueData::StringValue(v.to_string()))
675 }
676
677 VrlValue::Timestamp(ts) => {
678 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
679 input: ts.to_rfc3339(),
680 })?;
681 resolve_schema(
682 index,
683 p_ctx,
684 &column_name,
685 &ConcreteDataType::timestamp_nanosecond_datatype(),
686 schema_info,
687 )?;
688 Some(ValueData::TimestampNanosecondValue(ns))
689 }
690
691 VrlValue::Array(_) | VrlValue::Object(_) => {
692 let is_json_native_type = schema_info
693 .find_column_schema_in_table(&column_name)
694 .is_some_and(|x| {
695 if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
696 column_type.is_native_type()
697 } else {
698 false
699 }
700 });
701
702 let value = if is_json_native_type {
703 let json_extension_type: Option<JsonExtensionType> =
704 if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705 x.column_schema.extension_type()?
706 } else {
707 None
708 };
709 let settings = json_extension_type
710 .and_then(|x| x.metadata().json_structure_settings.clone())
711 .unwrap_or_default();
712 let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713 CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714 })?;
715 let value = settings.encode(value)?;
716
717 resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719 let Value::Json(value) = value else {
720 unreachable!()
721 };
722 ValueData::JsonValue(encode_json_value(*value))
723 } else {
724 resolve_schema(
725 index,
726 p_ctx,
727 &column_name,
728 &ConcreteDataType::binary_datatype(),
729 schema_info,
730 )?;
731
732 let value = vrl_value_to_jsonb_value(&value);
733 ValueData::BinaryValue(value.to_vec())
734 };
735 Some(value)
736 }
737 };
738
739 let value = GreptimeValue { value_data };
740 if let Some(index) = index {
741 row[index] = value;
742 } else {
743 row.push(value);
744 }
745 Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749 match value {
750 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756 VrlValue::Object(btree_map) => jsonb::Value::Object(
757 btree_map
758 .iter()
759 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760 .collect(),
761 ),
762 VrlValue::Array(values) => jsonb::Value::Array(
763 values
764 .iter()
765 .map(|value| vrl_value_to_jsonb_value(value))
766 .collect(),
767 ),
768 VrlValue::Null => jsonb::Value::Null,
769 }
770}
771
772fn identity_pipeline_inner(
773 pipeline_maps: Vec<VrlValue>,
774 pipeline_ctx: &PipelineContext<'_>,
775) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
776 let skip_error = pipeline_ctx.pipeline_param.skip_error();
777 let mut schema_info = SchemaInfo::default();
778 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
779
780 let column_schema = datatypes::schema::ColumnSchema::new(
782 custom_ts
783 .map(|ts| ts.get_column_name().to_string())
784 .unwrap_or_else(|| greptime_timestamp().to_string()),
785 custom_ts
786 .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
787 .unwrap_or_else(|| {
788 if pipeline_ctx.channel == Channel::Prometheus {
789 ConcreteDataType::timestamp_millisecond_datatype()
790 } else {
791 ConcreteDataType::timestamp_nanosecond_datatype()
792 }
793 }),
794 false,
795 );
796 schema_info.schema.push(ColumnMetadata {
797 column_schema,
798 semantic_type: SemanticType::Timestamp,
799 });
800
801 let mut opt_map = HashMap::new();
802 let len = pipeline_maps.len();
803
804 for mut pipeline_map in pipeline_maps {
805 let opt = unwrap_or_continue_if_err!(
806 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
807 skip_error
808 );
809 let row = unwrap_or_continue_if_err!(
810 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
811 skip_error
812 );
813
814 opt_map
815 .entry(opt)
816 .or_insert_with(|| Vec::with_capacity(len))
817 .push(row);
818 }
819
820 let column_count = schema_info.schema.len();
821 for (_, row) in opt_map.iter_mut() {
822 for row in row.iter_mut() {
823 let diff = column_count - row.values.len();
824 for _ in 0..diff {
825 row.values.push(GreptimeValue { value_data: None });
826 }
827 }
828 }
829
830 Ok((schema_info, opt_map))
831}
832
833pub fn identity_pipeline(
842 array: Vec<VrlValue>,
843 table: Option<Arc<table::Table>>,
844 pipeline_ctx: &PipelineContext<'_>,
845) -> Result<HashMap<ContextOpt, Rows>> {
846 let skip_error = pipeline_ctx.pipeline_param.skip_error();
847 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
848 let mut input = Vec::with_capacity(array.len());
850 for item in array.into_iter() {
851 let result =
852 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
853 input.push(result);
854 }
855
856 identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
857 if let Some(table) = table {
858 let table_info = table.table_info();
859 for tag_name in table_info.meta.row_key_column_names() {
860 if let Some(index) = schema.index.get(tag_name) {
861 schema.schema[*index].semantic_type = SemanticType::Tag;
862 }
863 }
864 }
865
866 let column_schemas = schema.column_schemas()?;
867 Ok(opt_map
868 .into_iter()
869 .map(|(opt, rows)| {
870 (
871 opt,
872 Rows {
873 schema: column_schemas.clone(),
874 rows,
875 },
876 )
877 })
878 .collect::<HashMap<ContextOpt, Rows>>())
879 })
880}
881
882pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
888 let mut flattened = BTreeMap::new();
889 let object = object.into_object().context(ValueMustBeMapSnafu)?;
890
891 if !object.is_empty() {
892 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
894 }
895
896 Ok(VrlValue::Object(flattened))
897}
898
899fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
900 match value {
901 VrlValue::Null => serde_json_crate::Value::Null,
902 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
903 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
904 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
905 .map(serde_json_crate::Value::Number)
906 .unwrap_or(serde_json_crate::Value::Null),
907 VrlValue::Bytes(bytes) => {
908 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
909 }
910 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
911 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
912 VrlValue::Array(arr) => {
913 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
914 }
915 VrlValue::Object(map) => serde_json_crate::Value::Object(
916 map.iter()
917 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
918 .collect(),
919 ),
920 }
921}
922
923fn do_flatten_object(
924 dest: &mut BTreeMap<KeyString, VrlValue>,
925 base: Option<&str>,
926 object: BTreeMap<KeyString, VrlValue>,
927 current_level: usize,
928 max_nested_levels: usize,
929) {
930 for (key, value) in object {
931 let new_key = base.map_or_else(
932 || key.clone(),
933 |base_key| format!("{base_key}.{key}").into(),
934 );
935
936 match value {
937 VrlValue::Object(object) => {
938 if current_level >= max_nested_levels {
939 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
941 &VrlValue::Object(object),
942 ))
943 .unwrap_or_else(|_| String::from("{}"));
944 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
945 } else {
946 do_flatten_object(
947 dest,
948 Some(&new_key),
949 object,
950 current_level + 1,
951 max_nested_levels,
952 );
953 }
954 }
955 VrlValue::Array(_) => {
957 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
958 .unwrap_or_else(|_| String::from("[]"));
959 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
960 }
961 _ => {
963 dest.insert(new_key, value);
964 }
965 }
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use api::v1::SemanticType;
972
973 use super::*;
974 use crate::{PipelineDefinition, identity_pipeline};
975
976 #[test]
977 fn test_identify_pipeline() {
978 let params = GreptimePipelineParams::default();
979 let pipeline_ctx = PipelineContext::new(
980 &PipelineDefinition::GreptimeIdentityPipeline(None),
981 ¶ms,
982 Channel::Unknown,
983 );
984 {
985 let array = [
986 serde_json::json!({
987 "woshinull": null,
988 "name": "Alice",
989 "age": 20,
990 "is_student": true,
991 "score": 99.5,
992 "hobbies": "reading",
993 "address": "Beijing",
994 }),
995 serde_json::json!({
996 "name": "Bob",
997 "age": 21,
998 "is_student": false,
999 "score": "88.5",
1000 "hobbies": "swimming",
1001 "address": "Shanghai",
1002 "gaga": "gaga"
1003 }),
1004 ];
1005 let array = array.iter().map(|v| v.into()).collect();
1006 let rows = identity_pipeline(array, None, &pipeline_ctx);
1007 assert!(rows.is_err());
1008 assert_eq!(
1009 rows.err().unwrap().to_string(),
1010 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1011 );
1012 }
1013 {
1014 let array = [
1015 serde_json::json!({
1016 "woshinull": null,
1017 "name": "Alice",
1018 "age": 20,
1019 "is_student": true,
1020 "score": 99.5,
1021 "hobbies": "reading",
1022 "address": "Beijing",
1023 }),
1024 serde_json::json!({
1025 "name": "Bob",
1026 "age": 21,
1027 "is_student": false,
1028 "score": 88,
1029 "hobbies": "swimming",
1030 "address": "Shanghai",
1031 "gaga": "gaga"
1032 }),
1033 ];
1034 let array = array.iter().map(|v| v.into()).collect();
1035 let rows = identity_pipeline(array, None, &pipeline_ctx);
1036 assert!(rows.is_err());
1037 assert_eq!(
1038 rows.err().unwrap().to_string(),
1039 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1040 );
1041 }
1042 {
1043 let array = [
1044 serde_json::json!({
1045 "woshinull": null,
1046 "name": "Alice",
1047 "age": 20,
1048 "is_student": true,
1049 "score": 99.5,
1050 "hobbies": "reading",
1051 "address": "Beijing",
1052 }),
1053 serde_json::json!({
1054 "name": "Bob",
1055 "age": 21,
1056 "is_student": false,
1057 "score": 88.5,
1058 "hobbies": "swimming",
1059 "address": "Shanghai",
1060 "gaga": "gaga"
1061 }),
1062 ];
1063 let array = array.iter().map(|v| v.into()).collect();
1064 let rows = identity_pipeline(array, None, &pipeline_ctx);
1065 assert!(rows.is_ok());
1066 let mut rows = rows.unwrap();
1067 assert!(rows.len() == 1);
1068 let rows = rows.remove(&ContextOpt::default()).unwrap();
1069 assert_eq!(rows.schema.len(), 8);
1070 assert_eq!(rows.rows.len(), 2);
1071 assert_eq!(8, rows.rows[0].values.len());
1072 assert_eq!(8, rows.rows[1].values.len());
1073 }
1074 {
1075 let array = [
1076 serde_json::json!({
1077 "woshinull": null,
1078 "name": "Alice",
1079 "age": 20,
1080 "is_student": true,
1081 "score": 99.5,
1082 "hobbies": "reading",
1083 "address": "Beijing",
1084 }),
1085 serde_json::json!({
1086 "name": "Bob",
1087 "age": 21,
1088 "is_student": false,
1089 "score": 88.5,
1090 "hobbies": "swimming",
1091 "address": "Shanghai",
1092 "gaga": "gaga"
1093 }),
1094 ];
1095 let tag_column_names = ["name".to_string(), "address".to_string()];
1096
1097 let rows =
1098 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
1099 .map(|(mut schema, mut rows)| {
1100 for name in tag_column_names {
1101 if let Some(index) = schema.index.get(&name) {
1102 schema.schema[*index].semantic_type = SemanticType::Tag;
1103 }
1104 }
1105
1106 assert!(rows.len() == 1);
1107 let rows = rows.remove(&ContextOpt::default()).unwrap();
1108
1109 Rows {
1110 schema: schema.column_schemas().unwrap(),
1111 rows,
1112 }
1113 });
1114
1115 assert!(rows.is_ok());
1116 let rows = rows.unwrap();
1117 assert_eq!(rows.schema.len(), 8);
1118 assert_eq!(rows.rows.len(), 2);
1119 assert_eq!(8, rows.rows[0].values.len());
1120 assert_eq!(8, rows.rows[1].values.len());
1121 assert_eq!(
1122 rows.schema
1123 .iter()
1124 .find(|x| x.column_name == "name")
1125 .unwrap()
1126 .semantic_type,
1127 SemanticType::Tag as i32
1128 );
1129 assert_eq!(
1130 rows.schema
1131 .iter()
1132 .find(|x| x.column_name == "address")
1133 .unwrap()
1134 .semantic_type,
1135 SemanticType::Tag as i32
1136 );
1137 assert_eq!(
1138 rows.schema
1139 .iter()
1140 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1141 .count(),
1142 2
1143 );
1144 }
1145 }
1146
1147 #[test]
1148 fn test_flatten() {
1149 let test_cases = vec![
1150 (
1152 serde_json::json!(
1153 {
1154 "a": {
1155 "b": {
1156 "c": [1, 2, 3]
1157 }
1158 },
1159 "d": [
1160 "foo",
1161 "bar"
1162 ],
1163 "e": {
1164 "f": [7, 8, 9],
1165 "g": {
1166 "h": 123,
1167 "i": "hello",
1168 "j": {
1169 "k": true
1170 }
1171 }
1172 }
1173 }
1174 ),
1175 10,
1176 Some(serde_json::json!(
1177 {
1178 "a.b.c": "[1,2,3]",
1179 "d": "[\"foo\",\"bar\"]",
1180 "e.f": "[7,8,9]",
1181 "e.g.h": 123,
1182 "e.g.i": "hello",
1183 "e.g.j.k": true
1184 }
1185 )),
1186 ),
1187 (
1189 serde_json::json!(
1190 {
1191 "a": {
1192 "b": {
1193 "c": {
1194 "d": [1, 2, 3]
1195 }
1196 }
1197 },
1198 "e": [
1199 "foo",
1200 "bar"
1201 ]
1202 }
1203 ),
1204 3,
1205 Some(serde_json::json!(
1206 {
1207 "a.b.c": "{\"d\":[1,2,3]}",
1208 "e": "[\"foo\",\"bar\"]"
1209 }
1210 )),
1211 ),
1212 ];
1213
1214 for (input, max_depth, expected) in test_cases {
1215 let input = input.into();
1216 let expected = expected.map(|e| e.into());
1217
1218 let flattened_object = flatten_object(input, max_depth).ok();
1219 assert_eq!(flattened_object, expected);
1220 }
1221 }
1222
1223 use ahash::HashMap as AHashMap;
1224 #[test]
1225 fn test_values_to_rows_skip_error_handling() {
1226 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1227
1228 {
1230 let schema_info = &mut SchemaInfo::default();
1231 let input_array = vec![
1232 serde_json::json!({"name": "Alice", "age": 25}).into(),
1234 VrlValue::Bytes("invalid_string".into()),
1236 serde_json::json!({"name": "Bob", "age": 30}).into(),
1238 VrlValue::Integer(42),
1240 serde_json::json!({"name": "Charlie", "age": 35}).into(),
1242 ];
1243
1244 let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1245 "skip_error".to_string(),
1246 "true".to_string(),
1247 )]));
1248
1249 let pipeline_ctx = PipelineContext::new(
1250 &PipelineDefinition::GreptimeIdentityPipeline(None),
1251 ¶ms,
1252 Channel::Unknown,
1253 );
1254
1255 let result = values_to_rows(
1256 schema_info,
1257 VrlValue::Array(input_array),
1258 &pipeline_ctx,
1259 None,
1260 true,
1261 table_suffix_template.as_ref(),
1262 );
1263
1264 assert!(result.is_ok());
1266 let rows_by_context = result.unwrap();
1267 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1269 assert_eq!(total_rows, 3); }
1271
1272 {
1274 let schema_info = &mut SchemaInfo::default();
1275 let input_array = vec![
1276 serde_json::json!({"name": "Alice", "age": 25}).into(),
1277 VrlValue::Bytes("invalid_string".into()), ];
1279
1280 let params = GreptimePipelineParams::default(); let pipeline_ctx = PipelineContext::new(
1283 &PipelineDefinition::GreptimeIdentityPipeline(None),
1284 ¶ms,
1285 Channel::Unknown,
1286 );
1287
1288 let result = values_to_rows(
1289 schema_info,
1290 VrlValue::Array(input_array),
1291 &pipeline_ctx,
1292 None,
1293 true,
1294 table_suffix_template.as_ref(),
1295 );
1296
1297 assert!(result.is_err());
1299 let error_msg = result.unwrap_err().to_string();
1300 assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1301 }
1302 }
1303
1304 #[test]
1306 fn test_values_to_rows_per_element_context_opt() {
1307 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1308 let schema_info = &mut SchemaInfo::default();
1309
1310 let input_array = vec![
1312 serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1313 serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1314 serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1315 ];
1316
1317 let params = GreptimePipelineParams::default();
1318 let pipeline_ctx = PipelineContext::new(
1319 &PipelineDefinition::GreptimeIdentityPipeline(None),
1320 ¶ms,
1321 Channel::Unknown,
1322 );
1323
1324 let result = values_to_rows(
1325 schema_info,
1326 VrlValue::Array(input_array),
1327 &pipeline_ctx,
1328 None,
1329 true,
1330 table_suffix_template.as_ref(),
1331 );
1332
1333 assert!(result.is_ok());
1334 let rows_by_context = result.unwrap();
1335
1336 assert_eq!(rows_by_context.len(), 2);
1338
1339 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1341 assert_eq!(total_rows, 3);
1342
1343 let mut ttl_1h_count = 0;
1345 let mut ttl_24h_count = 0;
1346 for rows in rows_by_context.values() {
1347 if rows.len() == 2 {
1349 ttl_1h_count = rows.len();
1350 } else if rows.len() == 1 {
1351 ttl_24h_count = rows.len();
1352 }
1353 }
1354 assert_eq!(ttl_1h_count, 2); assert_eq!(ttl_24h_count, 1); }
1357}