1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46 ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60pub type RowWithTableSuffix = (Row, Option<String>);
62
63#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67 transforms: Transforms,
68 schema: Vec<ColumnSchema>,
69}
70
71#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74 options: HashMap<String, String>,
77
78 pub skip_error: OnceCell<bool>,
80 pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86 pub fn from_params(params: Option<&str>) -> Self {
90 let options = Self::parse_header_str_to_map(params);
91
92 Self {
93 options,
94 skip_error: OnceCell::new(),
95 max_nested_levels: OnceCell::new(),
96 }
97 }
98
99 pub fn from_map(options: HashMap<String, String>) -> Self {
100 Self {
101 options,
102 skip_error: OnceCell::new(),
103 max_nested_levels: OnceCell::new(),
104 }
105 }
106
107 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108 if let Some(params) = params {
109 if params.is_empty() {
110 HashMap::new()
111 } else {
112 params
113 .split('&')
114 .filter_map(|s| s.split_once('='))
115 .map(|(k, v)| (k.to_string(), v.to_string()))
116 .collect::<HashMap<String, String>>()
117 }
118 } else {
119 HashMap::new()
120 }
121 }
122
123 pub fn skip_error(&self) -> bool {
125 *self
126 .skip_error
127 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128 }
129
130 pub fn max_nested_levels(&self) -> usize {
133 *self.max_nested_levels.get_or_init(|| {
134 self.options
135 .get("max_nested_levels")
136 .and_then(|s| s.parse::<usize>().ok())
137 .filter(|v| *v > 0)
138 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139 })
140 }
141}
142
143impl GreptimeTransformer {
144 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146 let type_ = ColumnDataType::TimestampNanosecond;
147 let default = None;
148
149 let transform = Transform {
150 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151 type_,
152 default,
153 index: Some(Index::Time),
154 index_options: None,
155 on_failure: Some(crate::etl::transform::OnFailure::Default),
156 tag: false,
157 };
158 transforms.push(transform);
159 }
160
161 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
163 let mut schema = vec![];
164 for transform in transforms.iter() {
165 schema.extend(coerce_columns(transform)?);
166 }
167 Ok(schema)
168 }
169}
170
171impl GreptimeTransformer {
172 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
173 let mut column_names_set = HashSet::new();
175 let mut timestamp_columns = vec![];
176
177 for transform in transforms.iter() {
178 let target_fields_set = transform
179 .fields
180 .iter()
181 .map(|f| f.target_or_input_field())
182 .collect::<HashSet<_>>();
183
184 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
185 if !intersections.is_empty() {
186 let duplicates = intersections.iter().join(",");
187 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
188 }
189
190 column_names_set.extend(target_fields_set);
191
192 if let Some(idx) = transform.index
193 && idx == Index::Time
194 {
195 match transform.fields.len() {
196 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
198 _ => {
199 return TransformMultipleTimestampIndexSnafu {
200 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
201 }
202 .fail();
203 }
204 }
205 }
206 }
207
208 let schema = match timestamp_columns.len() {
209 0 if doc_version == &PipelineDocVersion::V1 => {
210 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
212 GreptimeTransformer::init_schemas(&transforms)?
213 }
214 1 => GreptimeTransformer::init_schemas(&transforms)?,
215 count => {
216 let columns = timestamp_columns.iter().join(", ");
217 return TransformTimestampIndexCountSnafu { count, columns }.fail();
218 }
219 };
220 Ok(GreptimeTransformer { transforms, schema })
221 }
222
223 pub fn transform_mut(
224 &self,
225 pipeline_map: &mut VrlValue,
226 is_v1: bool,
227 ) -> Result<Vec<GreptimeValue>> {
228 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
229 let mut output_index = 0;
230 for transform in self.transforms.iter() {
231 for field in transform.fields.iter() {
232 let column_name = field.input_field();
233
234 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
235 match pipeline_map.get(column_name) {
237 Some(v) => {
238 let value_data = coerce_value(v, transform)?;
239 values[output_index] = GreptimeValue { value_data };
241 }
242 None => {
243 let value_data = match transform.on_failure {
244 Some(crate::etl::transform::OnFailure::Default) => {
245 match transform.get_default() {
246 Some(default) => Some(default.clone()),
247 None => transform.get_default_value_when_data_is_none(),
248 }
249 }
250 Some(crate::etl::transform::OnFailure::Ignore) => None,
251 None => None,
252 };
253 if transform.is_timeindex() && value_data.is_none() {
254 return TimeIndexMustBeNonNullSnafu.fail();
255 }
256 values[output_index] = GreptimeValue { value_data };
257 }
258 }
259 output_index += 1;
260 if !is_v1 {
261 pipeline_map.remove(column_name);
264 }
265 }
266 }
267 Ok(values)
268 }
269
270 pub fn transforms(&self) -> &Transforms {
271 &self.transforms
272 }
273
274 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275 &self.schema
276 }
277
278 pub fn transforms_mut(&mut self) -> &mut Transforms {
279 &mut self.transforms
280 }
281}
282
283#[derive(Clone)]
284pub struct ColumnMetadata {
285 column_schema: datatypes::schema::ColumnSchema,
286 semantic_type: SemanticType,
287}
288
289impl From<ColumnSchema> for ColumnMetadata {
290 fn from(value: ColumnSchema) -> Self {
291 let datatype = value.datatype();
292 let semantic_type = value.semantic_type();
293 let ColumnSchema {
294 column_name,
295 datatype: _,
296 semantic_type: _,
297 datatype_extension,
298 options,
299 } = value;
300
301 let column_schema = datatypes::schema::ColumnSchema::new(
302 column_name,
303 ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
304 semantic_type != SemanticType::Timestamp,
305 );
306
307 let metadata = collect_column_options(options.as_ref());
308 let column_schema = column_schema.with_metadata(metadata);
309
310 Self {
311 column_schema,
312 semantic_type,
313 }
314 }
315}
316
317impl TryFrom<ColumnMetadata> for ColumnSchema {
318 type Error = api::error::Error;
319
320 fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
321 let ColumnMetadata {
322 column_schema,
323 semantic_type,
324 } = value;
325
326 let options = options_from_column_schema(&column_schema);
327
328 let (datatype, datatype_extension) =
329 ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
330
331 Ok(ColumnSchema {
332 column_name: column_schema.name,
333 datatype: datatype as _,
334 semantic_type: semantic_type as _,
335 datatype_extension,
336 options,
337 })
338 }
339}
340
341#[derive(Default)]
345pub struct SchemaInfo {
346 pub schema: Vec<ColumnMetadata>,
348 pub index: HashMap<String, usize>,
350 table: Option<Arc<Table>>,
352}
353
354impl SchemaInfo {
355 pub fn with_capacity(capacity: usize) -> Self {
356 Self {
357 schema: Vec::with_capacity(capacity),
358 index: HashMap::with_capacity(capacity),
359 table: None,
360 }
361 }
362
363 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
364 let mut index = HashMap::new();
365 for (i, schema) in schema_list.iter().enumerate() {
366 index.insert(schema.column_name.clone(), i);
367 }
368 Self {
369 schema: schema_list.into_iter().map(Into::into).collect(),
370 index,
371 table: None,
372 }
373 }
374
375 pub fn set_table(&mut self, table: Option<Arc<Table>>) {
376 self.table = table;
377 }
378
379 fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
380 if let Some(table) = &self.table
381 && let Some(i) = table.schema_ref().column_index_by_name(column_name)
382 {
383 let column_schema = table.schema_ref().column_schemas()[i].clone();
384
385 let semantic_type = if column_schema.is_time_index() {
386 SemanticType::Timestamp
387 } else if table.table_info().meta.primary_key_indices.contains(&i) {
388 SemanticType::Tag
389 } else {
390 SemanticType::Field
391 };
392
393 Some(ColumnMetadata {
394 column_schema,
395 semantic_type,
396 })
397 } else {
398 None
399 }
400 }
401
402 pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
403 self.schema
404 .iter()
405 .map(|x| x.clone().try_into())
406 .collect::<api::error::Result<Vec<_>>>()
407 }
408}
409
410fn resolve_schema(
411 index: Option<usize>,
412 pipeline_context: &PipelineContext,
413 column: &str,
414 value_type: &ConcreteDataType,
415 schema_info: &mut SchemaInfo,
416) -> Result<()> {
417 if let Some(index) = index {
418 let column_type = &mut schema_info.schema[index].column_schema.data_type;
419 match (column_type, value_type) {
420 (column_type, value_type) if column_type == value_type => Ok(()),
421 (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
422 if column_type.is_json2() && value_type.is_json2() =>
423 {
424 Ok(())
425 }
426 (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
427 column,
428 expected: column_type.to_string(),
429 actual: value_type.to_string(),
430 }
431 .fail(),
432 }
433 } else {
434 let column_schema = schema_info
435 .find_column_schema_in_table(column)
436 .unwrap_or_else(|| {
437 let semantic_type = decide_semantic(pipeline_context, column);
438 let column_schema = datatypes::schema::ColumnSchema::new(
439 column,
440 value_type.clone(),
441 semantic_type != SemanticType::Timestamp,
442 );
443 ColumnMetadata {
444 column_schema,
445 semantic_type,
446 }
447 });
448 let key = column.to_string();
449 schema_info.schema.push(column_schema);
450 schema_info.index.insert(key, schema_info.schema.len() - 1);
451 Ok(())
452 }
453}
454
455fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
456 match p_ctx.channel {
457 Channel::Prometheus => {
458 let ts = values
459 .as_object()
460 .and_then(|m| m.get(greptime_timestamp()))
461 .and_then(|ts| ts.try_into_i64().ok())
462 .unwrap_or_default();
463 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
464 }
465 _ => {
466 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
467 match custom_ts {
468 Some(ts) => {
469 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
470 Some(ts.get_timestamp_value(ts_field)).transpose()
471 }
472 None => Ok(Some(ValueData::TimestampNanosecondValue(
473 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
474 ))),
475 }
476 }
477 }
478}
479
480pub(crate) fn values_to_rows(
489 schema_info: &mut SchemaInfo,
490 mut values: VrlValue,
491 pipeline_ctx: &PipelineContext<'_>,
492 row: Option<Vec<GreptimeValue>>,
493 need_calc_ts: bool,
494 tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
495) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
496 let skip_error = pipeline_ctx.pipeline_param.skip_error();
497 let VrlValue::Array(arr) = values else {
498 let mut result = std::collections::HashMap::new();
500
501 let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
502 Ok(r) => r,
503 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
504 };
505
506 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
507 let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
508 Ok(r) => r,
509 Err(e) => return if skip_error { Ok(result) } else { Err(e) },
510 };
511 result.insert(opt, vec![(row, table_suffix)]);
512 return Ok(result);
513 };
514
515 let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
516 std::collections::HashMap::new();
517 for (index, mut value) in arr.into_iter().enumerate() {
518 if !value.is_object() {
519 unwrap_or_continue_if_err!(
520 ArrayElementMustBeObjectSnafu {
521 index,
522 actual_type: value.kind_str().to_string(),
523 }
524 .fail(),
525 skip_error
526 );
527 }
528
529 let mut opt = unwrap_or_continue_if_err!(
531 ContextOpt::from_pipeline_map_to_opt(&mut value),
532 skip_error
533 );
534 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
535 let transformed_row = unwrap_or_continue_if_err!(
536 values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
537 skip_error
538 );
539 rows_by_context
540 .entry(opt)
541 .or_default()
542 .push((transformed_row, table_suffix));
543 }
544 Ok(rows_by_context)
545}
546
547pub(crate) fn values_to_row(
554 schema_info: &mut SchemaInfo,
555 values: VrlValue,
556 pipeline_ctx: &PipelineContext<'_>,
557 row: Option<Vec<GreptimeValue>>,
558 need_calc_ts: bool,
559) -> Result<Row> {
560 let mut row: Vec<GreptimeValue> =
561 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
562 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
563
564 if need_calc_ts {
565 let ts = calc_ts(pipeline_ctx, &values)?;
567 row.push(GreptimeValue { value_data: ts });
568 }
569
570 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
571
572 let ts_column_name = custom_ts
574 .as_ref()
575 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
576
577 let values = values.into_object().context(ValueMustBeMapSnafu)?;
578
579 for (column_name, value) in values {
580 if need_calc_ts && column_name.as_str() == ts_column_name {
581 continue;
582 }
583
584 resolve_value(
585 value,
586 column_name.into(),
587 &mut row,
588 schema_info,
589 pipeline_ctx,
590 )?;
591 }
592 Ok(Row { values: row })
593}
594
595fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
596 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
597 SemanticType::Tag
598 } else {
599 SemanticType::Field
600 }
601}
602
603fn resolve_value(
604 value: VrlValue,
605 column_name: String,
606 row: &mut Vec<GreptimeValue>,
607 schema_info: &mut SchemaInfo,
608 p_ctx: &PipelineContext,
609) -> Result<()> {
610 let index = schema_info.index.get(&column_name).copied();
611
612 let value_data = match value {
613 VrlValue::Null => return Ok(()),
614
615 VrlValue::Integer(v) => {
616 resolve_schema(
618 index,
619 p_ctx,
620 &column_name,
621 &ConcreteDataType::int64_datatype(),
622 schema_info,
623 )?;
624 Some(ValueData::I64Value(v))
625 }
626
627 VrlValue::Float(v) => {
628 resolve_schema(
630 index,
631 p_ctx,
632 &column_name,
633 &ConcreteDataType::float64_datatype(),
634 schema_info,
635 )?;
636 Some(ValueData::F64Value(v.into()))
637 }
638
639 VrlValue::Boolean(v) => {
640 resolve_schema(
641 index,
642 p_ctx,
643 &column_name,
644 &ConcreteDataType::boolean_datatype(),
645 schema_info,
646 )?;
647 Some(ValueData::BoolValue(v))
648 }
649
650 VrlValue::Bytes(v) => {
651 resolve_schema(
652 index,
653 p_ctx,
654 &column_name,
655 &ConcreteDataType::string_datatype(),
656 schema_info,
657 )?;
658 Some(ValueData::StringValue(String::from_utf8_lossy_owned(
659 v.to_vec(),
660 )))
661 }
662
663 VrlValue::Regex(v) => {
664 warn!(
665 "Persisting regex value in the table, this should not happen, column_name: {}",
666 column_name
667 );
668 resolve_schema(
669 index,
670 p_ctx,
671 &column_name,
672 &ConcreteDataType::string_datatype(),
673 schema_info,
674 )?;
675 Some(ValueData::StringValue(v.to_string()))
676 }
677
678 VrlValue::Timestamp(ts) => {
679 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
680 input: ts.to_rfc3339(),
681 })?;
682 resolve_schema(
683 index,
684 p_ctx,
685 &column_name,
686 &ConcreteDataType::timestamp_nanosecond_datatype(),
687 schema_info,
688 )?;
689 Some(ValueData::TimestampNanosecondValue(ns))
690 }
691
692 VrlValue::Array(_) | VrlValue::Object(_) => {
693 let is_json2 = schema_info
694 .find_column_schema_in_table(&column_name)
695 .is_some_and(|x| {
696 matches!(
697 &x.column_schema.data_type,
698 ConcreteDataType::Json(column_type) if column_type.is_json2()
699 )
700 });
701
702 let value = if is_json2 {
703 let json_extension_type: Option<JsonExtensionType> =
704 if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705 x.column_schema.extension_type()?
706 } else {
707 None
708 };
709 let settings = json_extension_type
710 .and_then(|x| x.metadata().json_structure_settings.clone())
711 .unwrap_or_default();
712 let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713 CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714 })?;
715 let value = settings.encode(value)?;
716
717 resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719 let Value::Json(value) = value else {
720 unreachable!()
721 };
722 ValueData::JsonValue(encode_json_value(*value))
723 } else {
724 resolve_schema(
725 index,
726 p_ctx,
727 &column_name,
728 &ConcreteDataType::binary_datatype(),
729 schema_info,
730 )?;
731
732 let value = vrl_value_to_jsonb_value(&value);
733 ValueData::BinaryValue(value.to_vec())
734 };
735 Some(value)
736 }
737 };
738
739 let value = GreptimeValue { value_data };
740 if let Some(index) = index {
741 row[index] = value;
742 } else {
743 row.push(value);
744 }
745 Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749 match value {
750 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756 VrlValue::Object(btree_map) => jsonb::Value::Object(
757 btree_map
758 .iter()
759 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760 .collect(),
761 ),
762 VrlValue::Array(values) => jsonb::Value::Array(
763 values
764 .iter()
765 .map(|value| vrl_value_to_jsonb_value(value))
766 .collect(),
767 ),
768 VrlValue::Null => jsonb::Value::Null,
769 }
770}
771
772fn identity_pipeline_inner(
773 pipeline_maps: Vec<VrlValue>,
774 pipeline_ctx: &PipelineContext<'_>,
775) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
776 let skip_error = pipeline_ctx.pipeline_param.skip_error();
777 let mut schema_info = SchemaInfo::default();
778 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
779
780 let column_schema = datatypes::schema::ColumnSchema::new(
782 custom_ts
783 .map(|ts| ts.get_column_name().to_string())
784 .unwrap_or_else(|| greptime_timestamp().to_string()),
785 custom_ts
786 .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
787 .unwrap_or_else(|| {
788 if pipeline_ctx.channel == Channel::Prometheus {
789 ConcreteDataType::timestamp_millisecond_datatype()
790 } else {
791 ConcreteDataType::timestamp_nanosecond_datatype()
792 }
793 }),
794 false,
795 );
796 schema_info.schema.push(ColumnMetadata {
797 column_schema,
798 semantic_type: SemanticType::Timestamp,
799 });
800
801 let mut opt_map = HashMap::new();
802 let len = pipeline_maps.len();
803
804 for mut pipeline_map in pipeline_maps {
805 let opt = unwrap_or_continue_if_err!(
806 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
807 skip_error
808 );
809 let row = unwrap_or_continue_if_err!(
810 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
811 skip_error
812 );
813
814 opt_map
815 .entry(opt)
816 .or_insert_with(|| Vec::with_capacity(len))
817 .push(row);
818 }
819
820 let column_count = schema_info.schema.len();
821 for (_, row) in opt_map.iter_mut() {
822 for row in row.iter_mut() {
823 assert!(
824 column_count >= row.values.len(),
825 "column_count: {}, row.values.len(): {}",
826 column_count,
827 row.values.len()
828 );
829 let diff = column_count - row.values.len();
830 for _ in 0..diff {
831 row.values.push(GreptimeValue { value_data: None });
832 }
833 }
834 }
835
836 Ok((schema_info, opt_map))
837}
838
839pub fn identity_pipeline(
848 array: Vec<VrlValue>,
849 table: Option<Arc<table::Table>>,
850 pipeline_ctx: &PipelineContext<'_>,
851) -> Result<HashMap<ContextOpt, Rows>> {
852 let skip_error = pipeline_ctx.pipeline_param.skip_error();
853 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
854 let mut input = Vec::with_capacity(array.len());
856 for item in array.into_iter() {
857 let result =
858 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
859 input.push(result);
860 }
861
862 identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
863 if let Some(table) = table {
864 let table_info = table.table_info();
865 for tag_name in table_info.meta.row_key_column_names() {
866 if let Some(index) = schema.index.get(tag_name) {
867 schema.schema[*index].semantic_type = SemanticType::Tag;
868 }
869 }
870 }
871
872 let column_schemas = schema.column_schemas()?;
873 Ok(opt_map
874 .into_iter()
875 .map(|(opt, rows)| {
876 (
877 opt,
878 Rows {
879 schema: column_schemas.clone(),
880 rows,
881 },
882 )
883 })
884 .collect::<HashMap<ContextOpt, Rows>>())
885 })
886}
887
888pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
894 let mut flattened = BTreeMap::new();
895 let object = object.into_object().context(ValueMustBeMapSnafu)?;
896
897 if !object.is_empty() {
898 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
900 }
901
902 Ok(VrlValue::Object(flattened))
903}
904
905fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
906 match value {
907 VrlValue::Null => serde_json_crate::Value::Null,
908 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
909 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
910 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
911 .map(serde_json_crate::Value::Number)
912 .unwrap_or(serde_json_crate::Value::Null),
913 VrlValue::Bytes(bytes) => {
914 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
915 }
916 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
917 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
918 VrlValue::Array(arr) => {
919 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
920 }
921 VrlValue::Object(map) => serde_json_crate::Value::Object(
922 map.iter()
923 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
924 .collect(),
925 ),
926 }
927}
928
929fn do_flatten_object(
930 dest: &mut BTreeMap<KeyString, VrlValue>,
931 base: Option<&str>,
932 object: BTreeMap<KeyString, VrlValue>,
933 current_level: usize,
934 max_nested_levels: usize,
935) {
936 for (key, value) in object {
937 let new_key = base.map_or_else(
938 || key.clone(),
939 |base_key| format!("{base_key}.{key}").into(),
940 );
941
942 match value {
943 VrlValue::Object(object) => {
944 if current_level >= max_nested_levels {
945 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
947 &VrlValue::Object(object),
948 ))
949 .unwrap_or_else(|_| String::from("{}"));
950 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
951 } else {
952 do_flatten_object(
953 dest,
954 Some(&new_key),
955 object,
956 current_level + 1,
957 max_nested_levels,
958 );
959 }
960 }
961 VrlValue::Array(_) => {
963 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
964 .unwrap_or_else(|_| String::from("[]"));
965 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
966 }
967 _ => {
969 dest.insert(new_key, value);
970 }
971 }
972 }
973}
974
975#[cfg(test)]
976mod tests {
977 use api::v1::SemanticType;
978
979 use super::*;
980 use crate::{PipelineDefinition, identity_pipeline};
981
982 #[test]
983 fn test_identify_pipeline() {
984 let params = GreptimePipelineParams::default();
985 let pipeline_ctx = PipelineContext::new(
986 &PipelineDefinition::GreptimeIdentityPipeline(None),
987 ¶ms,
988 Channel::Unknown,
989 );
990 {
991 let array = [
992 serde_json::json!({
993 "woshinull": null,
994 "name": "Alice",
995 "age": 20,
996 "is_student": true,
997 "score": 99.5,
998 "hobbies": "reading",
999 "address": "Beijing",
1000 }),
1001 serde_json::json!({
1002 "name": "Bob",
1003 "age": 21,
1004 "is_student": false,
1005 "score": "88.5",
1006 "hobbies": "swimming",
1007 "address": "Shanghai",
1008 "gaga": "gaga"
1009 }),
1010 ];
1011 let array = array.iter().map(|v| v.into()).collect();
1012 let rows = identity_pipeline(array, None, &pipeline_ctx);
1013 assert!(rows.is_err());
1014 assert_eq!(
1015 rows.err().unwrap().to_string(),
1016 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1017 );
1018 }
1019 {
1020 let array = [
1021 serde_json::json!({
1022 "woshinull": null,
1023 "name": "Alice",
1024 "age": 20,
1025 "is_student": true,
1026 "score": 99.5,
1027 "hobbies": "reading",
1028 "address": "Beijing",
1029 }),
1030 serde_json::json!({
1031 "name": "Bob",
1032 "age": 21,
1033 "is_student": false,
1034 "score": 88,
1035 "hobbies": "swimming",
1036 "address": "Shanghai",
1037 "gaga": "gaga"
1038 }),
1039 ];
1040 let array = array.iter().map(|v| v.into()).collect();
1041 let rows = identity_pipeline(array, None, &pipeline_ctx);
1042 assert!(rows.is_err());
1043 assert_eq!(
1044 rows.err().unwrap().to_string(),
1045 "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1046 );
1047 }
1048 {
1049 let array = [
1050 serde_json::json!({
1051 "woshinull": null,
1052 "name": "Alice",
1053 "age": 20,
1054 "is_student": true,
1055 "score": 99.5,
1056 "hobbies": "reading",
1057 "address": "Beijing",
1058 }),
1059 serde_json::json!({
1060 "name": "Bob",
1061 "age": 21,
1062 "is_student": false,
1063 "score": 88.5,
1064 "hobbies": "swimming",
1065 "address": "Shanghai",
1066 "gaga": "gaga"
1067 }),
1068 ];
1069 let array = array.iter().map(|v| v.into()).collect();
1070 let rows = identity_pipeline(array, None, &pipeline_ctx);
1071 assert!(rows.is_ok());
1072 let mut rows = rows.unwrap();
1073 assert!(rows.len() == 1);
1074 let rows = rows.remove(&ContextOpt::default()).unwrap();
1075 assert_eq!(rows.schema.len(), 8);
1076 assert_eq!(rows.rows.len(), 2);
1077 assert_eq!(8, rows.rows[0].values.len());
1078 assert_eq!(8, rows.rows[1].values.len());
1079 }
1080 {
1081 let array = [
1082 serde_json::json!({
1083 "woshinull": null,
1084 "name": "Alice",
1085 "age": 20,
1086 "is_student": true,
1087 "score": 99.5,
1088 "hobbies": "reading",
1089 "address": "Beijing",
1090 }),
1091 serde_json::json!({
1092 "name": "Bob",
1093 "age": 21,
1094 "is_student": false,
1095 "score": 88.5,
1096 "hobbies": "swimming",
1097 "address": "Shanghai",
1098 "gaga": "gaga"
1099 }),
1100 ];
1101 let tag_column_names = ["name".to_string(), "address".to_string()];
1102
1103 let rows =
1104 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
1105 .map(|(mut schema, mut rows)| {
1106 for name in tag_column_names {
1107 if let Some(index) = schema.index.get(&name) {
1108 schema.schema[*index].semantic_type = SemanticType::Tag;
1109 }
1110 }
1111
1112 assert!(rows.len() == 1);
1113 let rows = rows.remove(&ContextOpt::default()).unwrap();
1114
1115 Rows {
1116 schema: schema.column_schemas().unwrap(),
1117 rows,
1118 }
1119 });
1120
1121 assert!(rows.is_ok());
1122 let rows = rows.unwrap();
1123 assert_eq!(rows.schema.len(), 8);
1124 assert_eq!(rows.rows.len(), 2);
1125 assert_eq!(8, rows.rows[0].values.len());
1126 assert_eq!(8, rows.rows[1].values.len());
1127 assert_eq!(
1128 rows.schema
1129 .iter()
1130 .find(|x| x.column_name == "name")
1131 .unwrap()
1132 .semantic_type,
1133 SemanticType::Tag as i32
1134 );
1135 assert_eq!(
1136 rows.schema
1137 .iter()
1138 .find(|x| x.column_name == "address")
1139 .unwrap()
1140 .semantic_type,
1141 SemanticType::Tag as i32
1142 );
1143 assert_eq!(
1144 rows.schema
1145 .iter()
1146 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1147 .count(),
1148 2
1149 );
1150 }
1151 }
1152
1153 #[test]
1154 fn test_flatten() {
1155 let test_cases = vec![
1156 (
1158 serde_json::json!(
1159 {
1160 "a": {
1161 "b": {
1162 "c": [1, 2, 3]
1163 }
1164 },
1165 "d": [
1166 "foo",
1167 "bar"
1168 ],
1169 "e": {
1170 "f": [7, 8, 9],
1171 "g": {
1172 "h": 123,
1173 "i": "hello",
1174 "j": {
1175 "k": true
1176 }
1177 }
1178 }
1179 }
1180 ),
1181 10,
1182 Some(serde_json::json!(
1183 {
1184 "a.b.c": "[1,2,3]",
1185 "d": "[\"foo\",\"bar\"]",
1186 "e.f": "[7,8,9]",
1187 "e.g.h": 123,
1188 "e.g.i": "hello",
1189 "e.g.j.k": true
1190 }
1191 )),
1192 ),
1193 (
1195 serde_json::json!(
1196 {
1197 "a": {
1198 "b": {
1199 "c": {
1200 "d": [1, 2, 3]
1201 }
1202 }
1203 },
1204 "e": [
1205 "foo",
1206 "bar"
1207 ]
1208 }
1209 ),
1210 3,
1211 Some(serde_json::json!(
1212 {
1213 "a.b.c": "{\"d\":[1,2,3]}",
1214 "e": "[\"foo\",\"bar\"]"
1215 }
1216 )),
1217 ),
1218 ];
1219
1220 for (input, max_depth, expected) in test_cases {
1221 let input = input.into();
1222 let expected = expected.map(|e| e.into());
1223
1224 let flattened_object = flatten_object(input, max_depth).ok();
1225 assert_eq!(flattened_object, expected);
1226 }
1227 }
1228
1229 use ahash::HashMap as AHashMap;
1230 #[test]
1231 fn test_values_to_rows_skip_error_handling() {
1232 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1233
1234 {
1236 let schema_info = &mut SchemaInfo::default();
1237 let input_array = vec![
1238 serde_json::json!({"name": "Alice", "age": 25}).into(),
1240 VrlValue::Bytes("invalid_string".into()),
1242 serde_json::json!({"name": "Bob", "age": 30}).into(),
1244 VrlValue::Integer(42),
1246 serde_json::json!({"name": "Charlie", "age": 35}).into(),
1248 ];
1249
1250 let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1251 "skip_error".to_string(),
1252 "true".to_string(),
1253 )]));
1254
1255 let pipeline_ctx = PipelineContext::new(
1256 &PipelineDefinition::GreptimeIdentityPipeline(None),
1257 ¶ms,
1258 Channel::Unknown,
1259 );
1260
1261 let result = values_to_rows(
1262 schema_info,
1263 VrlValue::Array(input_array),
1264 &pipeline_ctx,
1265 None,
1266 true,
1267 table_suffix_template.as_ref(),
1268 );
1269
1270 assert!(result.is_ok());
1272 let rows_by_context = result.unwrap();
1273 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1275 assert_eq!(total_rows, 3); }
1277
1278 {
1280 let schema_info = &mut SchemaInfo::default();
1281 let input_array = vec![
1282 serde_json::json!({"name": "Alice", "age": 25}).into(),
1283 VrlValue::Bytes("invalid_string".into()), ];
1285
1286 let params = GreptimePipelineParams::default(); let pipeline_ctx = PipelineContext::new(
1289 &PipelineDefinition::GreptimeIdentityPipeline(None),
1290 ¶ms,
1291 Channel::Unknown,
1292 );
1293
1294 let result = values_to_rows(
1295 schema_info,
1296 VrlValue::Array(input_array),
1297 &pipeline_ctx,
1298 None,
1299 true,
1300 table_suffix_template.as_ref(),
1301 );
1302
1303 assert!(result.is_err());
1305 let error_msg = result.unwrap_err().to_string();
1306 assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1307 }
1308 }
1309
1310 #[test]
1312 fn test_values_to_rows_per_element_context_opt() {
1313 let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1314 let schema_info = &mut SchemaInfo::default();
1315
1316 let input_array = vec![
1318 serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1319 serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1320 serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1321 ];
1322
1323 let params = GreptimePipelineParams::default();
1324 let pipeline_ctx = PipelineContext::new(
1325 &PipelineDefinition::GreptimeIdentityPipeline(None),
1326 ¶ms,
1327 Channel::Unknown,
1328 );
1329
1330 let result = values_to_rows(
1331 schema_info,
1332 VrlValue::Array(input_array),
1333 &pipeline_ctx,
1334 None,
1335 true,
1336 table_suffix_template.as_ref(),
1337 );
1338
1339 assert!(result.is_ok());
1340 let rows_by_context = result.unwrap();
1341
1342 assert_eq!(rows_by_context.len(), 2);
1344
1345 let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1347 assert_eq!(total_rows, 3);
1348
1349 let mut ttl_1h_count = 0;
1351 let mut ttl_24h_count = 0;
1352 for rows in rows_by_context.values() {
1353 if rows.len() == 2 {
1355 ttl_1h_count = rows.len();
1356 } else if rows.len() == 1 {
1357 ttl_24h_count = rows.len();
1358 }
1359 }
1360 assert_eq!(ttl_1h_count, 2); assert_eq!(ttl_24h_count, 1); }
1363}