1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46 ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60pub type RowWithTableSuffix = (Row, Option<String>);
62
63#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67 transforms: Transforms,
68 schema: Vec<ColumnSchema>,
69}
70
71#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74 options: HashMap<String, String>,
77
78 pub skip_error: OnceCell<bool>,
80 pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86 pub fn from_params(params: Option<&str>) -> Self {
90 let options = Self::parse_header_str_to_map(params);
91
92 Self {
93 options,
94 skip_error: OnceCell::new(),
95 max_nested_levels: OnceCell::new(),
96 }
97 }
98
99 pub fn from_map(options: HashMap<String, String>) -> Self {
100 Self {
101 options,
102 skip_error: OnceCell::new(),
103 max_nested_levels: OnceCell::new(),
104 }
105 }
106
107 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108 if let Some(params) = params {
109 if params.is_empty() {
110 HashMap::new()
111 } else {
112 params
113 .split('&')
114 .filter_map(|s| s.split_once('='))
115 .map(|(k, v)| (k.to_string(), v.to_string()))
116 .collect::<HashMap<String, String>>()
117 }
118 } else {
119 HashMap::new()
120 }
121 }
122
123 pub fn skip_error(&self) -> bool {
125 *self
126 .skip_error
127 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128 }
129
130 pub fn max_nested_levels(&self) -> usize {
133 *self.max_nested_levels.get_or_init(|| {
134 self.options
135 .get("max_nested_levels")
136 .and_then(|s| s.parse::<usize>().ok())
137 .filter(|v| *v > 0)
138 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139 })
140 }
141}
142
143impl GreptimeTransformer {
144 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146 let type_ = ColumnDataType::TimestampNanosecond;
147 let default = None;
148
149 let transform = Transform {
150 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151 type_,
152 default,
153 index: Some(Index::Time),
154 index_options: None,
155 on_failure: Some(crate::etl::transform::OnFailure::Default),
156 tag: false,
157 };
158 transforms.push(transform);
159 }
160
161 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
163 let mut schema = vec![];
164 for transform in transforms.iter() {
165 schema.extend(coerce_columns(transform)?);
166 }
167 Ok(schema)
168 }
169}
170
171impl GreptimeTransformer {
172 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
173 let mut column_names_set = HashSet::new();
175 let mut timestamp_columns = vec![];
176
177 for transform in transforms.iter() {
178 let target_fields_set = transform
179 .fields
180 .iter()
181 .map(|f| f.target_or_input_field())
182 .collect::<HashSet<_>>();
183
184 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
185 if !intersections.is_empty() {
186 let duplicates = intersections.iter().join(",");
187 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
188 }
189
190 column_names_set.extend(target_fields_set);
191
192 if let Some(idx) = transform.index
193 && idx == Index::Time
194 {
195 match transform.fields.len() {
196 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
198 _ => {
199 return TransformMultipleTimestampIndexSnafu {
200 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
201 }
202 .fail();
203 }
204 }
205 }
206 }
207
208 let schema = match timestamp_columns.len() {
209 0 if doc_version == &PipelineDocVersion::V1 => {
210 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
212 GreptimeTransformer::init_schemas(&transforms)?
213 }
214 1 => GreptimeTransformer::init_schemas(&transforms)?,
215 count => {
216 let columns = timestamp_columns.iter().join(", ");
217 return TransformTimestampIndexCountSnafu { count, columns }.fail();
218 }
219 };
220 Ok(GreptimeTransformer { transforms, schema })
221 }
222
223 pub fn transform_mut(
224 &self,
225 pipeline_map: &mut VrlValue,
226 is_v1: bool,
227 ) -> Result<Vec<GreptimeValue>> {
228 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
229 let mut output_index = 0;
230 for transform in self.transforms.iter() {
231 for field in transform.fields.iter() {
232 let column_name = field.input_field();
233
234 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
235 match pipeline_map.get(column_name) {
237 Some(v) => {
238 let value_data = coerce_value(v, transform)?;
239 values[output_index] = GreptimeValue { value_data };
241 }
242 None => {
243 let value_data = match transform.on_failure {
244 Some(crate::etl::transform::OnFailure::Default) => {
245 match transform.get_default() {
246 Some(default) => Some(default.clone()),
247 None => transform.get_default_value_when_data_is_none(),
248 }
249 }
250 Some(crate::etl::transform::OnFailure::Ignore) => None,
251 None => None,
252 };
253 if transform.is_timeindex() && value_data.is_none() {
254 return TimeIndexMustBeNonNullSnafu.fail();
255 }
256 values[output_index] = GreptimeValue { value_data };
257 }
258 }
259 output_index += 1;
260 if !is_v1 {
261 pipeline_map.remove(column_name);
264 }
265 }
266 }
267 Ok(values)
268 }
269
270 pub fn transforms(&self) -> &Transforms {
271 &self.transforms
272 }
273
274 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275 &self.schema
276 }
277
278 pub fn transforms_mut(&mut self) -> &mut Transforms {
279 &mut self.transforms
280 }
281}
282
283#[derive(Clone)]
284pub struct ColumnMetadata {
285 column_schema: datatypes::schema::ColumnSchema,
286 semantic_type: SemanticType,
287}
288
289impl From<ColumnSchema> for ColumnMetadata {
290 fn from(value: ColumnSchema) -> Self {
291 let datatype = value.datatype();
292 let semantic_type = value.semantic_type();
293 let ColumnSchema {
294 column_name,
295 datatype: _,
296 semantic_type: _,
297 datatype_extension,
298 options,
299 } = value;
300
301 let column_schema = datatypes::schema::ColumnSchema::new(
302 column_name,
303 ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
304 semantic_type != SemanticType::Timestamp,
305 );
306
307 let metadata = collect_column_options(options.as_ref());
308 let column_schema = column_schema.with_metadata(metadata);
309
310 Self {
311 column_schema,
312 semantic_type,
313 }
314 }
315}
316
317impl TryFrom<ColumnMetadata> for ColumnSchema {
318 type Error = api::error::Error;
319
320 fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
321 let ColumnMetadata {
322 column_schema,
323 semantic_type,
324 } = value;
325
326 let options = options_from_column_schema(&column_schema);
327
328 let (datatype, datatype_extension) =
329 ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
330
331 Ok(ColumnSchema {
332 column_name: column_schema.name,
333 datatype: datatype as _,
334 semantic_type: semantic_type as _,
335 datatype_extension,
336 options,
337 })
338 }
339}
340
341#[derive(Default)]
345pub struct SchemaInfo {
346 pub schema: Vec<ColumnMetadata>,
348 pub index: HashMap<String, usize>,
350 table: Option<Arc<Table>>,
352}
353
354impl SchemaInfo {
355 pub fn with_capacity(capacity: usize) -> Self {
356 Self {
357 schema: Vec::with_capacity(capacity),
358 index: HashMap::with_capacity(capacity),
359 table: None,
360 }
361 }
362
363 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
364 let mut index = HashMap::new();
365 for (i, schema) in schema_list.iter().enumerate() {
366 index.insert(schema.column_name.clone(), i);
367 }
368 Self {
369 schema: schema_list.into_iter().map(Into::into).collect(),
370 index,
371 table: None,
372 }
373 }
374
375 pub fn set_table(&mut self, table: Option<Arc<Table>>) {
376 self.table = table;
377 }
378
379 fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
380 if let Some(table) = &self.table
381 && let Some(i) = table.schema_ref().column_index_by_name(column_name)
382 {
383 let column_schema = table.schema_ref().column_schemas()[i].clone();
384
385 let semantic_type = if column_schema.is_time_index() {
386 SemanticType::Timestamp
387 } else if table.table_info().meta.primary_key_indices.contains(&i) {
388 SemanticType::Tag
389 } else {
390 SemanticType::Field
391 };
392
393 Some(ColumnMetadata {
394 column_schema,
395 semantic_type,
396 })
397 } else {
398 None
399 }
400 }
401
402 pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
403 self.schema
404 .iter()
405 .map(|x| x.clone().try_into())
406 .collect::<api::error::Result<Vec<_>>>()
407 }
408}
409
410fn resolve_schema(
411 index: Option<usize>,
412 pipeline_context: &PipelineContext,
413 column: &str,
414 value_type: &ConcreteDataType,
415 schema_info: &mut SchemaInfo,
416) -> Result<()> {
417 if let Some(index) = index {
418 let column_type = &mut schema_info.schema[index].column_schema.data_type;
419 match (column_type, value_type) {
420 (column_type, value_type) if column_type == value_type => Ok(()),
421 (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
422 if column_type.is_json2() && value_type.is_json2() =>
423 {
424 Ok(())
425 }
426 (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
427 column,
428 expected: column_type.to_string(),
429 actual: value_type.to_string(),
430 }
431 .fail(),
432 }
433 } else {
434 let column_schema = schema_info
435 .find_column_schema_in_table(column)
436 .unwrap_or_else(|| {
437 let semantic_type = decide_semantic(pipeline_context, column);
438 let column_schema = datatypes::schema::ColumnSchema::new(
439 column,
440 value_type.clone(),
441 semantic_type != SemanticType::Timestamp,
442 );
443 ColumnMetadata {
444 column_schema,
445 semantic_type,
446 }
447 });
448 let key = column.to_string();
449 schema_info.schema.push(column_schema);
450 schema_info.index.insert(key, schema_info.schema.len() - 1);
451 Ok(())
452 }
453}
454
455fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
456 match p_ctx.channel {
457 Channel::Prometheus => {
458 let ts = values
459 .as_object()
460 .and_then(|m| m.get(greptime_timestamp()))
461 .and_then(|ts| ts.try_into_i64().ok())
462 .unwrap_or_default();
463 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
464 }
465 _ => {
466 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
467 match custom_ts {
468 Some(ts) => {
469 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
470 Some(ts.get_timestamp_value(ts_field)).transpose()
471 }
472 None => Ok(Some(ValueData::TimestampNanosecondValue(
473 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
474 ))),
475 }
476 }
477 }
478}
479
480pub(crate) fn values_to_rows(
489 schema_info: &mut SchemaInfo,
490 mut values: VrlValue,
491 pipeline_ctx: &PipelineContext<'_>,
492 row: Option<Vec<GreptimeValue>>,
493 need_calc_ts: bool,
494 tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
495) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
496 let skip_error = pipeline_ctx.pipeline_param.skip_error();
497 let VrlValue::Array(arr) = values else {
498 let mut result = std::collections::HashMap::new();
500
501 let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
502 Ok(r) => r,
503 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
504 };
505
506 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
507 let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
508 Ok(r) => r,
509 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
510 };
511 result.insert(opt, vec![(row, table_suffix)]);
512 return Ok(result);
513 };
514
515 let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
516 std::collections::HashMap::new();
517 for (index, mut value) in arr.into_iter().enumerate() {
518 if !value.is_object() {
519 unwrap_or_continue_if_err!(
520 ArrayElementMustBeObjectSnafu {
521 index,
522 actual_type: value.kind_str().to_string(),
523 }
524 .fail(),
525 skip_error
526 );
527 }
528
529 let mut opt = unwrap_or_continue_if_err!(
531 ContextOpt::from_pipeline_map_to_opt(&mut value),
532 skip_error
533 );
534 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
535 let transformed_row = unwrap_or_continue_if_err!(
536 values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
537 skip_error
538 );
539 rows_by_context
540 .entry(opt)
541 .or_default()
542 .push((transformed_row, table_suffix));
543 }
544 Ok(rows_by_context)
545}
546
547pub(crate) fn values_to_row(
554 schema_info: &mut SchemaInfo,
555 values: VrlValue,
556 pipeline_ctx: &PipelineContext<'_>,
557 row: Option<Vec<GreptimeValue>>,
558 need_calc_ts: bool,
559) -> Result<Row> {
560 let mut row: Vec<GreptimeValue> =
561 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
562 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
563
564 if need_calc_ts {
565 let ts = calc_ts(pipeline_ctx, &values)?;
567 row.push(GreptimeValue { value_data: ts });
568 }
569
570 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
571
572 let ts_column_name = custom_ts
574 .as_ref()
575 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
576
577 let values = values.into_object().context(ValueMustBeMapSnafu)?;
578
579 for (column_name, value) in values {
580 if need_calc_ts && column_name.as_str() == ts_column_name {
581 continue;
582 }
583
584 resolve_value(
585 value,
586 column_name.into(),
587 &mut row,
588 schema_info,
589 pipeline_ctx,
590 )?;
591 }
592 Ok(Row { values: row })
593}
594
595fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
596 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
597 SemanticType::Tag
598 } else {
599 SemanticType::Field
600 }
601}
602
603fn resolve_value(
604 value: VrlValue,
605 column_name: String,
606 row: &mut Vec<GreptimeValue>,
607 schema_info: &mut SchemaInfo,
608 p_ctx: &PipelineContext,
609) -> Result<()> {
610 let index = schema_info.index.get(&column_name).copied();
611
612 let value_data = match value {
613 VrlValue::Null => return Ok(()),
614
615 VrlValue::Integer(v) => {
616 resolve_schema(
618 index,
619 p_ctx,
620 &column_name,
621 &ConcreteDataType::int64_datatype(),
622 schema_info,
623 )?;
624 Some(ValueData::I64Value(v))
625 }
626
627 VrlValue::Float(v) => {
628 resolve_schema(
630 index,
631 p_ctx,
632 &column_name,
633 &ConcreteDataType::float64_datatype(),
634 schema_info,
635 )?;
636 Some(ValueData::F64Value(v.into()))
637 }
638
639 VrlValue::Boolean(v) => {
640 resolve_schema(
641 index,
642 p_ctx,
643 &column_name,
644 &ConcreteDataType::boolean_datatype(),
645 schema_info,
646 )?;
647 Some(ValueData::BoolValue(v))
648 }
649
650 VrlValue::Bytes(v) => {
651 resolve_schema(
652 index,
653 p_ctx,
654 &column_name,
655 &ConcreteDataType::string_datatype(),
656 schema_info,
657 )?;
658 Some(ValueData::StringValue(String::from_utf8_lossy_owned(
659 v.to_vec(),
660 )))
661 }
662
663 VrlValue::Regex(v) => {
664 warn!(
665 "Persisting regex value in the table, this should not happen, column_name: {}",
666 column_name
667 );
668 resolve_schema(
669 index,
670 p_ctx,
671 &column_name,
672 &ConcreteDataType::string_datatype(),
673 schema_info,
674 )?;
675 Some(ValueData::StringValue(v.to_string()))
676 }
677
678 VrlValue::Timestamp(ts) => {
679 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
680 input: ts.to_rfc3339(),
681 })?;
682 resolve_schema(
683 index,
684 p_ctx,
685 &column_name,
686 &ConcreteDataType::timestamp_nanosecond_datatype(),
687 schema_info,
688 )?;
689 Some(ValueData::TimestampNanosecondValue(ns))
690 }
691
692 VrlValue::Array(_) | VrlValue::Object(_) => {
693 let is_json2 = schema_info
694 .find_column_schema_in_table(&column_name)
695 .is_some_and(|x| {
696 matches!(
697 &x.column_schema.data_type,
698 ConcreteDataType::Json(column_type) if column_type.is_json2()
699 )
700 });
701
702 let value = if is_json2 {
703 let json_extension_type: Option<JsonExtensionType> =
704 if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705 x.column_schema.extension_type()?
706 } else {
707 None
708 };
709 let settings = json_extension_type
710 .and_then(|x| x.metadata().json_settings.clone())
711 .unwrap_or_default();
712 let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713 CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714 })?;
715 let value = settings.encode(value)?;
716
717 resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719 let Value::Json(value) = value else {
720 unreachable!()
721 };
722 ValueData::JsonValue(encode_json_value(*value))
723 } else {
724 resolve_schema(
725 index,
726 p_ctx,
727 &column_name,
728 &ConcreteDataType::binary_datatype(),
729 schema_info,
730 )?;
731
732 let value = vrl_value_to_jsonb_value(&value);
733 ValueData::BinaryValue(value.to_vec())
734 };
735 Some(value)
736 }
737 };
738
739 let value = GreptimeValue { value_data };
740 if let Some(index) = index {
741 row[index] = value;
742 } else {
743 row.push(value);
744 }
745 Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749 match value {
750 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756 VrlValue::Object(btree_map) => jsonb::Value::Object(
757 btree_map
758 .iter()
759 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760 .collect(),
761 ),
762 VrlValue::Array(values) => jsonb::Value::Array(
763 values
764 .iter()
765 .map(|value| vrl_value_to_jsonb_value(value))
766 .collect(),
767 ),
768 VrlValue::Null => jsonb::Value::Null,
769 }
770}
771
772fn identity_pipeline_inner(
773 pipeline_maps: Vec<VrlValue>,
774 pipeline_ctx: &PipelineContext<'_>,
775 max_nested_levels: usize,
776) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
777 let skip_error = pipeline_ctx.pipeline_param.skip_error();
778 let mut schema_info = SchemaInfo::default();
779 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
780
781 let column_schema = datatypes::schema::ColumnSchema::new(
783 custom_ts
784 .map(|ts| ts.get_column_name().to_string())
785 .unwrap_or_else(|| greptime_timestamp().to_string()),
786 custom_ts
787 .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
788 .unwrap_or_else(|| {
789 if pipeline_ctx.channel == Channel::Prometheus {
790 ConcreteDataType::timestamp_millisecond_datatype()
791 } else {
792 ConcreteDataType::timestamp_nanosecond_datatype()
793 }
794 }),
795 false,
796 );
797 schema_info.schema.push(ColumnMetadata {
798 column_schema,
799 semantic_type: SemanticType::Timestamp,
800 });
801
802 let mut opt_map = HashMap::new();
803 let len = pipeline_maps.len();
804
805 for pipeline_map in pipeline_maps {
806 let mut pipeline_map =
807 unwrap_or_continue_if_err!(flatten_object(pipeline_map, max_nested_levels), skip_error);
808 let opt = unwrap_or_continue_if_err!(
809 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
810 skip_error
811 );
812 let row = unwrap_or_continue_if_err!(
813 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
814 skip_error
815 );
816
817 opt_map
818 .entry(opt)
819 .or_insert_with(|| Vec::with_capacity(len))
820 .push(row);
821 }
822
823 let column_count = schema_info.schema.len();
824 for (_, row) in opt_map.iter_mut() {
825 for row in row.iter_mut() {
826 assert!(
827 column_count >= row.values.len(),
828 "column_count: {}, row.values.len(): {}",
829 column_count,
830 row.values.len()
831 );
832 row.values
833 .resize(column_count, GreptimeValue { value_data: None });
834 }
835 }
836
837 Ok((schema_info, opt_map))
838}
839
840pub fn identity_pipeline(
849 array: Vec<VrlValue>,
850 table: Option<Arc<table::Table>>,
851 pipeline_ctx: &PipelineContext<'_>,
852) -> Result<HashMap<ContextOpt, Rows>> {
853 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
854
855 let (mut schema, opt_map) = identity_pipeline_inner(array, pipeline_ctx, max_nested_levels)?;
856 if let Some(table) = table {
857 let table_info = table.table_info();
858 for tag_name in table_info.meta.row_key_column_names() {
859 if let Some(index) = schema.index.get(tag_name) {
860 schema.schema[*index].semantic_type = SemanticType::Tag;
861 }
862 }
863 }
864
865 let column_schemas = schema.column_schemas()?;
866 Ok(opt_map
867 .into_iter()
868 .map(|(opt, rows)| {
869 (
870 opt,
871 Rows {
872 schema: column_schemas.clone(),
873 rows,
874 },
875 )
876 })
877 .collect::<HashMap<ContextOpt, Rows>>())
878}
879
880pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
886 let mut flattened = BTreeMap::new();
887 let object = object.into_object().context(ValueMustBeMapSnafu)?;
888
889 if !object.is_empty() {
890 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
892 }
893
894 Ok(VrlValue::Object(flattened))
895}
896
897fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
898 match value {
899 VrlValue::Null => serde_json_crate::Value::Null,
900 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
901 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
902 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
903 .map(serde_json_crate::Value::Number)
904 .unwrap_or(serde_json_crate::Value::Null),
905 VrlValue::Bytes(bytes) => {
906 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
907 }
908 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
909 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
910 VrlValue::Array(arr) => {
911 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
912 }
913 VrlValue::Object(map) => serde_json_crate::Value::Object(
914 map.iter()
915 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
916 .collect(),
917 ),
918 }
919}
920
921fn do_flatten_object(
922 dest: &mut BTreeMap<KeyString, VrlValue>,
923 base: Option<&str>,
924 object: BTreeMap<KeyString, VrlValue>,
925 current_level: usize,
926 max_nested_levels: usize,
927) {
928 for (key, value) in object {
929 let new_key = base.map_or_else(
930 || key.clone(),
931 |base_key| format!("{base_key}.{key}").into(),
932 );
933
934 match value {
935 VrlValue::Object(object) => {
936 if current_level >= max_nested_levels {
937 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
939 &VrlValue::Object(object),
940 ))
941 .unwrap_or_else(|_| String::from("{}"));
942 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
943 } else {
944 do_flatten_object(
945 dest,
946 Some(&new_key),
947 object,
948 current_level + 1,
949 max_nested_levels,
950 );
951 }
952 }
953 VrlValue::Array(_) => {
955 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
956 .unwrap_or_else(|_| String::from("[]"));
957 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
958 }
959 _ => {
961 dest.insert(new_key, value);
962 }
963 }
964 }
965}
966
967#[cfg(test)]
968mod tests {
969 use api::v1::SemanticType;
970
971 use super::*;
972 use crate::{PipelineDefinition, identity_pipeline};
973
974 #[test]
975 fn test_identify_pipeline() {
976 let params = GreptimePipelineParams::default();
977 let pipeline_ctx = PipelineContext::new(
978 &PipelineDefinition::GreptimeIdentityPipeline(None),
979 ¶ms,
980 Channel::Unknown,
981 );
982 {
983 let array = [
984 serde_json::json!({
985 "woshinull": null,
986 "name": "Alice",
987 "age": 20,
988 "is_student": true,
989 "score": 99.5,
990 "hobbies": "reading",
991 "address": "Beijing",
992 }),
993 serde_json::json!({
994 "name": "Bob",
995 "age": 21,
996 "is_student": false,
997 "score": "88.5",
998 "hobbies": "swimming",
999 "address": "Shanghai",
1000 "gaga": "gaga"
1001 }),
1002 ];
1003 let array = array.iter().map(|v| v.into()).collect();
1004 let rows = identity_pipeline(array, None, &pipeline_ctx);
1005 assert!(rows.is_err());
1006 assert_eq!(
1007 rows.err().unwrap().to_string(),
1008 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1009 );
1010 }
1011 {
1012 let array = [
1013 serde_json::json!({
1014 "woshinull": null,
1015 "name": "Alice",
1016 "age": 20,
1017 "is_student": true,
1018 "score": 99.5,
1019 "hobbies": "reading",
1020 "address": "Beijing",
1021 }),
1022 serde_json::json!({
1023 "name": "Bob",
1024 "age": 21,
1025 "is_student": false,
1026 "score": 88,
1027 "hobbies": "swimming",
1028 "address": "Shanghai",
1029 "gaga": "gaga"
1030 }),
1031 ];
1032 let array = array.iter().map(|v| v.into()).collect();
1033 let rows = identity_pipeline(array, None, &pipeline_ctx);
1034 assert!(rows.is_err());
1035 assert_eq!(
1036 rows.err().unwrap().to_string(),
1037 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1038 );
1039 }
1040 {
1041 let array = [
1042 serde_json::json!({
1043 "woshinull": null,
1044 "name": "Alice",
1045 "age": 20,
1046 "is_student": true,
1047 "score": 99.5,
1048 "hobbies": "reading",
1049 "address": "Beijing",
1050 }),
1051 serde_json::json!({
1052 "name": "Bob",
1053 "age": 21,
1054 "is_student": false,
1055 "score": 88.5,
1056 "hobbies": "swimming",
1057 "address": "Shanghai",
1058 "gaga": "gaga"
1059 }),
1060 ];
1061 let array = array.iter().map(|v| v.into()).collect();
1062 let rows = identity_pipeline(array, None, &pipeline_ctx);
1063 assert!(rows.is_ok());
1064 let mut rows = rows.unwrap();
1065 assert!(rows.len() == 1);
1066 let rows = rows.remove(&ContextOpt::default()).unwrap();
1067 assert_eq!(rows.schema.len(), 8);
1068 assert_eq!(rows.rows.len(), 2);
1069 assert_eq!(8, rows.rows[0].values.len());
1070 assert_eq!(8, rows.rows[1].values.len());
1071 }
1072 {
1073 let array = [
1074 serde_json::json!({
1075 "woshinull": null,
1076 "name": "Alice",
1077 "age": 20,
1078 "is_student": true,
1079 "score": 99.5,
1080 "hobbies": "reading",
1081 "address": "Beijing",
1082 }),
1083 serde_json::json!({
1084 "name": "Bob",
1085 "age": 21,
1086 "is_student": false,
1087 "score": 88.5,
1088 "hobbies": "swimming",
1089 "address": "Shanghai",
1090 "gaga": "gaga"
1091 }),
1092 ];
1093 let tag_column_names = ["name".to_string(), "address".to_string()];
1094
1095 let rows = identity_pipeline_inner(
1096 array.iter().map(|v| v.into()).collect(),
1097 &pipeline_ctx,
1098 pipeline_ctx.pipeline_param.max_nested_levels(),
1099 )
1100 .map(|(mut schema, mut rows)| {
1101 for name in tag_column_names {
1102 if let Some(index) = schema.index.get(&name) {
1103 schema.schema[*index].semantic_type = SemanticType::Tag;
1104 }
1105 }
1106
1107 assert!(rows.len() == 1);
1108 let rows = rows.remove(&ContextOpt::default()).unwrap();
1109
1110 Rows {
1111 schema: schema.column_schemas().unwrap(),
1112 rows,
1113 }
1114 });
1115
1116 assert!(rows.is_ok());
1117 let rows = rows.unwrap();
1118 assert_eq!(rows.schema.len(), 8);
1119 assert_eq!(rows.rows.len(), 2);
1120 assert_eq!(8, rows.rows[0].values.len());
1121 assert_eq!(8, rows.rows[1].values.len());
1122 assert_eq!(
1123 rows.schema
1124 .iter()
1125 .find(|x| x.column_name == "name")
1126 .unwrap()
1127 .semantic_type,
1128 SemanticType::Tag as i32
1129 );
1130 assert_eq!(
1131 rows.schema
1132 .iter()
1133 .find(|x| x.column_name == "address")
1134 .unwrap()
1135 .semantic_type,
1136 SemanticType::Tag as i32
1137 );
1138 assert_eq!(
1139 rows.schema
1140 .iter()
1141 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1142 .count(),
1143 2
1144 );
1145 }
1146 }
1147
1148 #[test]
1149 fn test_flatten() {
1150 let test_cases = vec![
1151 (
1153 serde_json::json!(
1154 {
1155 "a": {
1156 "b": {
1157 "c": [1, 2, 3]
1158 }
1159 },
1160 "d": [
1161 "foo",
1162 "bar"
1163 ],
1164 "e": {
1165 "f": [7, 8, 9],
1166 "g": {
1167 "h": 123,
1168 "i": "hello",
1169 "j": {
1170 "k": true
1171 }
1172 }
1173 }
1174 }
1175 ),
1176 10,
1177 Some(serde_json::json!(
1178 {
1179 "a.b.c": "[1,2,3]",
1180 "d": "[\"foo\",\"bar\"]",
1181 "e.f": "[7,8,9]",
1182 "e.g.h": 123,
1183 "e.g.i": "hello",
1184 "e.g.j.k": true
1185 }
1186 )),
1187 ),
1188 (
1190 serde_json::json!(
1191 {
1192 "a": {
1193 "b": {
1194 "c": {
1195 "d": [1, 2, 3]
1196 }
1197 }
1198 },
1199 "e": [
1200 "foo",
1201 "bar"
1202 ]
1203 }
1204 ),
1205 3,
1206 Some(serde_json::json!(
1207 {
1208 "a.b.c": "{\"d\":[1,2,3]}",
1209 "e": "[\"foo\",\"bar\"]"
1210 }
1211 )),
1212 ),
1213 ];
1214
1215 for (input, max_depth, expected) in test_cases {
1216 let input = input.into();
1217 let expected = expected.map(|e| e.into());
1218
1219 let flattened_object = flatten_object(input, max_depth).ok();
1220 assert_eq!(flattened_object, expected);
1221 }
1222 }
1223
1224 #[test]
1225 fn test_identity_pipeline_skip_error_flattens_valid_rows() {
1226 let params = GreptimePipelineParams::from_map(ahash::HashMap::from_iter([(
1227 "skip_error".to_string(),
1228 "true".to_string(),
1229 )]));
1230 let pipeline_def = PipelineDefinition::GreptimeIdentityPipeline(None);
1231 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, Channel::Unknown);
1232 let array = vec![
1233 serde_json::json!({
1234 "service": "frontend",
1235 "nested": {
1236 "status": 200,
1237 "path": "/v1/ingest"
1238 },
1239 "labels": ["pipeline", "identity"]
1240 })
1241 .into(),
1242 VrlValue::Bytes("invalid_string".into()),
1243 serde_json::json!({
1244 "service": "frontend",
1245 "nested": {
1246 "status": 201,
1247 "path": "/v1/ingest"
1248 },
1249 "labels": ["pipeline", "identity"]
1250 })
1251 .into(),
1252 ];
1253
1254 let mut rows_by_opt = identity_pipeline(array, None, &pipeline_ctx).unwrap();
1255 let rows = rows_by_opt.remove(&ContextOpt::default()).unwrap();
1256
1257 assert_eq!(rows.rows.len(), 2);
1258 assert_eq!(rows.schema.len(), rows.rows[0].values.len());
1259 assert!(rows.schema.iter().any(|s| s.column_name == "nested.status"));
1260 assert!(rows.schema.iter().any(|s| s.column_name == "nested.path"));
1261 assert!(rows.schema.iter().any(|s| s.column_name == "labels"));
1262 }
1263
1264 use ahash::HashMap as AHashMap;
1265 #[test]
1266 fn test_values_to_rows_skip_error_handling() {
1267 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1268
1269 {
1271 let schema_info = &mut SchemaInfo::default();
1272 let input_array = vec![
1273 serde_json::json!({"name": "Alice", "age": 25}).into(),
1275 VrlValue::Bytes("invalid_string".into()),
1277 serde_json::json!({"name": "Bob", "age": 30}).into(),
1279 VrlValue::Integer(42),
1281 serde_json::json!({"name": "Charlie", "age": 35}).into(),
1283 ];
1284
1285 let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1286 "skip_error".to_string(),
1287 "true".to_string(),
1288 )]));
1289
1290 let pipeline_ctx = PipelineContext::new(
1291 &PipelineDefinition::GreptimeIdentityPipeline(None),
1292 ¶ms,
1293 Channel::Unknown,
1294 );
1295
1296 let result = values_to_rows(
1297 schema_info,
1298 VrlValue::Array(input_array),
1299 &pipeline_ctx,
1300 None,
1301 true,
1302 table_suffix_template.as_ref(),
1303 );
1304
1305 assert!(result.is_ok());
1307 let rows_by_context = result.unwrap();
1308 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1310 assert_eq!(total_rows, 3); }
1312
1313 {
1315 let schema_info = &mut SchemaInfo::default();
1316 let input_array = vec![
1317 serde_json::json!({"name": "Alice", "age": 25}).into(),
1318 VrlValue::Bytes("invalid_string".into()), ];
1320
1321 let params = GreptimePipelineParams::default(); let pipeline_ctx = PipelineContext::new(
1324 &PipelineDefinition::GreptimeIdentityPipeline(None),
1325 ¶ms,
1326 Channel::Unknown,
1327 );
1328
1329 let result = values_to_rows(
1330 schema_info,
1331 VrlValue::Array(input_array),
1332 &pipeline_ctx,
1333 None,
1334 true,
1335 table_suffix_template.as_ref(),
1336 );
1337
1338 assert!(result.is_err());
1340 let error_msg = result.unwrap_err().to_string();
1341 assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1342 }
1343 }
1344
1345 #[test]
1347 fn test_values_to_rows_per_element_context_opt() {
1348 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1349 let schema_info = &mut SchemaInfo::default();
1350
1351 let input_array = vec![
1353 serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1354 serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1355 serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1356 ];
1357
1358 let params = GreptimePipelineParams::default();
1359 let pipeline_ctx = PipelineContext::new(
1360 &PipelineDefinition::GreptimeIdentityPipeline(None),
1361 ¶ms,
1362 Channel::Unknown,
1363 );
1364
1365 let result = values_to_rows(
1366 schema_info,
1367 VrlValue::Array(input_array),
1368 &pipeline_ctx,
1369 None,
1370 true,
1371 table_suffix_template.as_ref(),
1372 );
1373
1374 assert!(result.is_ok());
1375 let rows_by_context = result.unwrap();
1376
1377 assert_eq!(rows_by_context.len(), 2);
1379
1380 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1382 assert_eq!(total_rows, 3);
1383
1384 let mut ttl_1h_count = 0;
1386 let mut ttl_24h_count = 0;
1387 for rows in rows_by_context.values() {
1388 if rows.len() == 2 {
1390 ttl_1h_count = rows.len();
1391 } else if rows.len() == 1 {
1392 ttl_24h_count = rows.len();
1393 }
1394 }
1395 assert_eq!(ttl_1h_count, 2); assert_eq!(ttl_24h_count, 1); }
1398}