pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46    ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60/// Row with potentially designated table suffix.
61pub type RowWithTableSuffix = (Row, Option<String>);
62
63/// fields not in the columns will be discarded
64/// to prevent automatic column creation in GreptimeDB
65#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67    transforms: Transforms,
68    schema: Vec<ColumnSchema>,
69}
70
71/// Parameters that can be used to configure the greptime pipelines.
72#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74    /// The original options for configuring the greptime pipelines.
75    /// This should not be used directly, instead, use the parsed shortcut option values.
76    options: HashMap<String, String>,
77
78    /// Whether to skip error when processing the pipeline.
79    pub skip_error: OnceCell<bool>,
80    /// Max nested levels when flattening JSON object. Defaults to
81    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
82    pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
87    /// The params is in the format of `key1=value1&key2=value2`,for example:
88    /// x-greptime-pipeline-params: max_nested_levels=5
89    pub fn from_params(params: Option<&str>) -> Self {
90        let options = Self::parse_header_str_to_map(params);
91
92        Self {
93            options,
94            skip_error: OnceCell::new(),
95            max_nested_levels: OnceCell::new(),
96        }
97    }
98
99    pub fn from_map(options: HashMap<String, String>) -> Self {
100        Self {
101            options,
102            skip_error: OnceCell::new(),
103            max_nested_levels: OnceCell::new(),
104        }
105    }
106
107    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108        if let Some(params) = params {
109            if params.is_empty() {
110                HashMap::new()
111            } else {
112                params
113                    .split('&')
114                    .filter_map(|s| s.split_once('='))
115                    .map(|(k, v)| (k.to_string(), v.to_string()))
116                    .collect::<HashMap<String, String>>()
117            }
118        } else {
119            HashMap::new()
120        }
121    }
122
123    /// Whether to skip error when processing the pipeline.
124    pub fn skip_error(&self) -> bool {
125        *self
126            .skip_error
127            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128    }
129
130    /// Max nested levels for JSON flattening. If not provided or invalid,
131    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
132    pub fn max_nested_levels(&self) -> usize {
133        *self.max_nested_levels.get_or_init(|| {
134            self.options
135                .get("max_nested_levels")
136                .and_then(|s| s.parse::<usize>().ok())
137                .filter(|v| *v > 0)
138                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139        })
140    }
141}
142
143impl GreptimeTransformer {
144    /// Add a default timestamp column to the transforms
145    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146        let type_ = ColumnDataType::TimestampNanosecond;
147        let default = None;
148
149        let transform = Transform {
150            fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151            type_,
152            default,
153            index: Some(Index::Time),
154            on_failure: Some(crate::etl::transform::OnFailure::Default),
155            tag: false,
156        };
157        transforms.push(transform);
158    }
159
160    /// Generate the schema for the GreptimeTransformer
161    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
162        let mut schema = vec![];
163        for transform in transforms.iter() {
164            schema.extend(coerce_columns(transform)?);
165        }
166        Ok(schema)
167    }
168}
169
170impl GreptimeTransformer {
171    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
172        // empty check is done in the caller
173        let mut column_names_set = HashSet::new();
174        let mut timestamp_columns = vec![];
175
176        for transform in transforms.iter() {
177            let target_fields_set = transform
178                .fields
179                .iter()
180                .map(|f| f.target_or_input_field())
181                .collect::<HashSet<_>>();
182
183            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
184            if !intersections.is_empty() {
185                let duplicates = intersections.iter().join(",");
186                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
187            }
188
189            column_names_set.extend(target_fields_set);
190
191            if let Some(idx) = transform.index
192                && idx == Index::Time
193            {
194                match transform.fields.len() {
195                    //Safety unwrap is fine here because we have checked the length of real_fields
196                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
197                    _ => {
198                        return TransformMultipleTimestampIndexSnafu {
199                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
200                        }
201                        .fail();
202                    }
203                }
204            }
205        }
206
207        let schema = match timestamp_columns.len() {
208            0 if doc_version == &PipelineDocVersion::V1 => {
209                // compatible with v1, add a default timestamp column
210                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
211                GreptimeTransformer::init_schemas(&transforms)?
212            }
213            1 => GreptimeTransformer::init_schemas(&transforms)?,
214            count => {
215                let columns = timestamp_columns.iter().join(", ");
216                return TransformTimestampIndexCountSnafu { count, columns }.fail();
217            }
218        };
219        Ok(GreptimeTransformer { transforms, schema })
220    }
221
222    pub fn transform_mut(
223        &self,
224        pipeline_map: &mut VrlValue,
225        is_v1: bool,
226    ) -> Result<Vec<GreptimeValue>> {
227        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
228        let mut output_index = 0;
229        for transform in self.transforms.iter() {
230            for field in transform.fields.iter() {
231                let column_name = field.input_field();
232
233                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
234                // let keep us `get` here to be compatible with v1
235                match pipeline_map.get(column_name) {
236                    Some(v) => {
237                        let value_data = coerce_value(v, transform)?;
238                        // every transform fields has only one output field
239                        values[output_index] = GreptimeValue { value_data };
240                    }
241                    None => {
242                        let value_data = match transform.on_failure {
243                            Some(crate::etl::transform::OnFailure::Default) => {
244                                match transform.get_default() {
245                                    Some(default) => Some(default.clone()),
246                                    None => transform.get_default_value_when_data_is_none(),
247                                }
248                            }
249                            Some(crate::etl::transform::OnFailure::Ignore) => None,
250                            None => None,
251                        };
252                        if transform.is_timeindex() && value_data.is_none() {
253                            return TimeIndexMustBeNonNullSnafu.fail();
254                        }
255                        values[output_index] = GreptimeValue { value_data };
256                    }
257                }
258                output_index += 1;
259                if !is_v1 {
260                    // remove the column from the pipeline_map
261                    // so that the auto-transform can use the rest fields
262                    pipeline_map.remove(column_name);
263                }
264            }
265        }
266        Ok(values)
267    }
268
269    pub fn transforms(&self) -> &Transforms {
270        &self.transforms
271    }
272
273    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
274        &self.schema
275    }
276
277    pub fn transforms_mut(&mut self) -> &mut Transforms {
278        &mut self.transforms
279    }
280}
281
282#[derive(Clone)]
283pub struct ColumnMetadata {
284    column_schema: datatypes::schema::ColumnSchema,
285    semantic_type: SemanticType,
286}
287
288impl From<ColumnSchema> for ColumnMetadata {
289    fn from(value: ColumnSchema) -> Self {
290        let datatype = value.datatype();
291        let semantic_type = value.semantic_type();
292        let ColumnSchema {
293            column_name,
294            datatype: _,
295            semantic_type: _,
296            datatype_extension,
297            options,
298        } = value;
299
300        let column_schema = datatypes::schema::ColumnSchema::new(
301            column_name,
302            ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
303            semantic_type != SemanticType::Timestamp,
304        );
305
306        let metadata = collect_column_options(options.as_ref());
307        let column_schema = column_schema.with_metadata(metadata);
308
309        Self {
310            column_schema,
311            semantic_type,
312        }
313    }
314}
315
316impl TryFrom<ColumnMetadata> for ColumnSchema {
317    type Error = api::error::Error;
318
319    fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
320        let ColumnMetadata {
321            column_schema,
322            semantic_type,
323        } = value;
324
325        let options = options_from_column_schema(&column_schema);
326
327        let (datatype, datatype_extension) =
328            ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
329
330        Ok(ColumnSchema {
331            column_name: column_schema.name,
332            datatype: datatype as _,
333            semantic_type: semantic_type as _,
334            datatype_extension,
335            options,
336        })
337    }
338}
339
340/// This is used to record the current state schema information and a sequential cache of field names.
341/// As you traverse the user input JSON, this will change.
342/// It will record a superset of all user input schemas.
343#[derive(Default)]
344pub struct SchemaInfo {
345    /// schema info
346    pub schema: Vec<ColumnMetadata>,
347    /// index of the column name
348    pub index: HashMap<String, usize>,
349    /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
350    table: Option<Arc<Table>>,
351}
352
353impl SchemaInfo {
354    pub fn with_capacity(capacity: usize) -> Self {
355        Self {
356            schema: Vec::with_capacity(capacity),
357            index: HashMap::with_capacity(capacity),
358            table: None,
359        }
360    }
361
362    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
363        let mut index = HashMap::new();
364        for (i, schema) in schema_list.iter().enumerate() {
365            index.insert(schema.column_name.clone(), i);
366        }
367        Self {
368            schema: schema_list.into_iter().map(Into::into).collect(),
369            index,
370            table: None,
371        }
372    }
373
374    pub fn set_table(&mut self, table: Option<Arc<Table>>) {
375        self.table = table;
376    }
377
378    fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
379        if let Some(table) = &self.table
380            && let Some(i) = table.schema_ref().column_index_by_name(column_name)
381        {
382            let column_schema = table.schema_ref().column_schemas()[i].clone();
383
384            let semantic_type = if column_schema.is_time_index() {
385                SemanticType::Timestamp
386            } else if table.table_info().meta.primary_key_indices.contains(&i) {
387                SemanticType::Tag
388            } else {
389                SemanticType::Field
390            };
391
392            Some(ColumnMetadata {
393                column_schema,
394                semantic_type,
395            })
396        } else {
397            None
398        }
399    }
400
401    pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
402        self.schema
403            .iter()
404            .map(|x| x.clone().try_into())
405            .collect::<api::error::Result<Vec<_>>>()
406    }
407}
408
409fn resolve_schema(
410    index: Option<usize>,
411    pipeline_context: &PipelineContext,
412    column: &str,
413    value_type: &ConcreteDataType,
414    schema_info: &mut SchemaInfo,
415) -> Result<()> {
416    if let Some(index) = index {
417        let column_type = &mut schema_info.schema[index].column_schema.data_type;
418        match (column_type, value_type) {
419            (column_type, value_type) if column_type == value_type => Ok(()),
420            (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
421                if column_type.is_include(value_type) =>
422            {
423                Ok(())
424            }
425            (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
426                column,
427                expected: column_type.to_string(),
428                actual: value_type.to_string(),
429            }
430            .fail(),
431        }
432    } else {
433        let column_schema = schema_info
434            .find_column_schema_in_table(column)
435            .unwrap_or_else(|| {
436                let semantic_type = decide_semantic(pipeline_context, column);
437                let column_schema = datatypes::schema::ColumnSchema::new(
438                    column,
439                    value_type.clone(),
440                    semantic_type != SemanticType::Timestamp,
441                );
442                ColumnMetadata {
443                    column_schema,
444                    semantic_type,
445                }
446            });
447        let key = column.to_string();
448        schema_info.schema.push(column_schema);
449        schema_info.index.insert(key, schema_info.schema.len() - 1);
450        Ok(())
451    }
452}
453
454fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
455    match p_ctx.channel {
456        Channel::Prometheus => {
457            let ts = values
458                .as_object()
459                .and_then(|m| m.get(greptime_timestamp()))
460                .and_then(|ts| ts.try_into_i64().ok())
461                .unwrap_or_default();
462            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
463        }
464        _ => {
465            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
466            match custom_ts {
467                Some(ts) => {
468                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
469                    Some(ts.get_timestamp_value(ts_field)).transpose()
470                }
471                None => Ok(Some(ValueData::TimestampNanosecondValue(
472                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
473                ))),
474            }
475        }
476    }
477}
478
479/// Converts VRL values to Greptime rows grouped by their ContextOpt.
480/// # Returns
481/// A HashMap where keys are `ContextOpt` and values are vectors of (row, table_suffix) pairs.
482/// Single object input produces one ContextOpt group with one row.
483/// Array input groups rows by their per-element ContextOpt values.
484///
485/// # Errors
486/// - `ArrayElementMustBeObject` if an array element is not an object
487pub(crate) fn values_to_rows(
488    schema_info: &mut SchemaInfo,
489    mut values: VrlValue,
490    pipeline_ctx: &PipelineContext<'_>,
491    row: Option<Vec<GreptimeValue>>,
492    need_calc_ts: bool,
493    tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
494) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
495    let skip_error = pipeline_ctx.pipeline_param.skip_error();
496    let VrlValue::Array(arr) = values else {
497        // Single object: extract ContextOpt and table_suffix
498        let mut result = std::collections::HashMap::new();
499
500        let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
501            Ok(r) => r,
502            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
503        };
504
505        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
506        let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
507            Ok(r) => r,
508            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
509        };
510        result.insert(opt, vec![(row, table_suffix)]);
511        return Ok(result);
512    };
513
514    let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
515        std::collections::HashMap::new();
516    for (index, mut value) in arr.into_iter().enumerate() {
517        if !value.is_object() {
518            unwrap_or_continue_if_err!(
519                ArrayElementMustBeObjectSnafu {
520                    index,
521                    actual_type: value.kind_str().to_string(),
522                }
523                .fail(),
524                skip_error
525            );
526        }
527
528        // Extract ContextOpt and table_suffix for this element
529        let mut opt = unwrap_or_continue_if_err!(
530            ContextOpt::from_pipeline_map_to_opt(&mut value),
531            skip_error
532        );
533        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
534        let transformed_row = unwrap_or_continue_if_err!(
535            values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
536            skip_error
537        );
538        rows_by_context
539            .entry(opt)
540            .or_default()
541            .push((transformed_row, table_suffix));
542    }
543    Ok(rows_by_context)
544}
545
546/// `need_calc_ts` happens in two cases:
547/// 1. full greptime_identity
548/// 2. auto-transform without transformer
549///
550/// if transform is present in custom pipeline in v2 mode
551/// we dont need to calc ts again, nor do we need to check ts column name
552pub(crate) fn values_to_row(
553    schema_info: &mut SchemaInfo,
554    values: VrlValue,
555    pipeline_ctx: &PipelineContext<'_>,
556    row: Option<Vec<GreptimeValue>>,
557    need_calc_ts: bool,
558) -> Result<Row> {
559    let mut row: Vec<GreptimeValue> =
560        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
561    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563    if need_calc_ts {
564        // calculate timestamp value based on the channel
565        let ts = calc_ts(pipeline_ctx, &values)?;
566        row.push(GreptimeValue { value_data: ts });
567    }
568
569    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
570
571    // skip ts column
572    let ts_column_name = custom_ts
573        .as_ref()
574        .map_or(greptime_timestamp(), |ts| ts.get_column_name());
575
576    let values = values.into_object().context(ValueMustBeMapSnafu)?;
577
578    for (column_name, value) in values {
579        if need_calc_ts && column_name.as_str() == ts_column_name {
580            continue;
581        }
582
583        resolve_value(
584            value,
585            column_name.into(),
586            &mut row,
587            schema_info,
588            pipeline_ctx,
589        )?;
590    }
591    Ok(Row { values: row })
592}
593
594fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
595    if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
596        SemanticType::Tag
597    } else {
598        SemanticType::Field
599    }
600}
601
602fn resolve_value(
603    value: VrlValue,
604    column_name: String,
605    row: &mut Vec<GreptimeValue>,
606    schema_info: &mut SchemaInfo,
607    p_ctx: &PipelineContext,
608) -> Result<()> {
609    let index = schema_info.index.get(&column_name).copied();
610
611    let value_data = match value {
612        VrlValue::Null => return Ok(()),
613
614        VrlValue::Integer(v) => {
615            // safe unwrap after type matched
616            resolve_schema(
617                index,
618                p_ctx,
619                &column_name,
620                &ConcreteDataType::int64_datatype(),
621                schema_info,
622            )?;
623            Some(ValueData::I64Value(v))
624        }
625
626        VrlValue::Float(v) => {
627            // safe unwrap after type matched
628            resolve_schema(
629                index,
630                p_ctx,
631                &column_name,
632                &ConcreteDataType::float64_datatype(),
633                schema_info,
634            )?;
635            Some(ValueData::F64Value(v.into()))
636        }
637
638        VrlValue::Boolean(v) => {
639            resolve_schema(
640                index,
641                p_ctx,
642                &column_name,
643                &ConcreteDataType::boolean_datatype(),
644                schema_info,
645            )?;
646            Some(ValueData::BoolValue(v))
647        }
648
649        VrlValue::Bytes(v) => {
650            resolve_schema(
651                index,
652                p_ctx,
653                &column_name,
654                &ConcreteDataType::string_datatype(),
655                schema_info,
656            )?;
657            Some(ValueData::StringValue(String::from_utf8_lossy_owned(
658                v.to_vec(),
659            )))
660        }
661
662        VrlValue::Regex(v) => {
663            warn!(
664                "Persisting regex value in the table, this should not happen, column_name: {}",
665                column_name
666            );
667            resolve_schema(
668                index,
669                p_ctx,
670                &column_name,
671                &ConcreteDataType::string_datatype(),
672                schema_info,
673            )?;
674            Some(ValueData::StringValue(v.to_string()))
675        }
676
677        VrlValue::Timestamp(ts) => {
678            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
679                input: ts.to_rfc3339(),
680            })?;
681            resolve_schema(
682                index,
683                p_ctx,
684                &column_name,
685                &ConcreteDataType::timestamp_nanosecond_datatype(),
686                schema_info,
687            )?;
688            Some(ValueData::TimestampNanosecondValue(ns))
689        }
690
691        VrlValue::Array(_) | VrlValue::Object(_) => {
692            let is_json_native_type = schema_info
693                .find_column_schema_in_table(&column_name)
694                .is_some_and(|x| {
695                    if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
696                        column_type.is_native_type()
697                    } else {
698                        false
699                    }
700                });
701
702            let value = if is_json_native_type {
703                let json_extension_type: Option<JsonExtensionType> =
704                    if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705                        x.column_schema.extension_type()?
706                    } else {
707                        None
708                    };
709                let settings = json_extension_type
710                    .and_then(|x| x.metadata().json_structure_settings.clone())
711                    .unwrap_or_default();
712                let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713                    CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714                })?;
715                let value = settings.encode(value)?;
716
717                resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719                let Value::Json(value) = value else {
720                    unreachable!()
721                };
722                ValueData::JsonValue(encode_json_value(*value))
723            } else {
724                resolve_schema(
725                    index,
726                    p_ctx,
727                    &column_name,
728                    &ConcreteDataType::binary_datatype(),
729                    schema_info,
730                )?;
731
732                let value = vrl_value_to_jsonb_value(&value);
733                ValueData::BinaryValue(value.to_vec())
734            };
735            Some(value)
736        }
737    };
738
739    let value = GreptimeValue { value_data };
740    if let Some(index) = index {
741        row[index] = value;
742    } else {
743        row.push(value);
744    }
745    Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749    match value {
750        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756        VrlValue::Object(btree_map) => jsonb::Value::Object(
757            btree_map
758                .iter()
759                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760                .collect(),
761        ),
762        VrlValue::Array(values) => jsonb::Value::Array(
763            values
764                .iter()
765                .map(|value| vrl_value_to_jsonb_value(value))
766                .collect(),
767        ),
768        VrlValue::Null => jsonb::Value::Null,
769    }
770}
771
772fn identity_pipeline_inner(
773    pipeline_maps: Vec<VrlValue>,
774    pipeline_ctx: &PipelineContext<'_>,
775) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
776    let skip_error = pipeline_ctx.pipeline_param.skip_error();
777    let mut schema_info = SchemaInfo::default();
778    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
779
780    // set time index column schema first
781    let column_schema = datatypes::schema::ColumnSchema::new(
782        custom_ts
783            .map(|ts| ts.get_column_name().to_string())
784            .unwrap_or_else(|| greptime_timestamp().to_string()),
785        custom_ts
786            .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
787            .unwrap_or_else(|| {
788                if pipeline_ctx.channel == Channel::Prometheus {
789                    ConcreteDataType::timestamp_millisecond_datatype()
790                } else {
791                    ConcreteDataType::timestamp_nanosecond_datatype()
792                }
793            }),
794        false,
795    );
796    schema_info.schema.push(ColumnMetadata {
797        column_schema,
798        semantic_type: SemanticType::Timestamp,
799    });
800
801    let mut opt_map = HashMap::new();
802    let len = pipeline_maps.len();
803
804    for mut pipeline_map in pipeline_maps {
805        let opt = unwrap_or_continue_if_err!(
806            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
807            skip_error
808        );
809        let row = unwrap_or_continue_if_err!(
810            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
811            skip_error
812        );
813
814        opt_map
815            .entry(opt)
816            .or_insert_with(|| Vec::with_capacity(len))
817            .push(row);
818    }
819
820    let column_count = schema_info.schema.len();
821    for (_, row) in opt_map.iter_mut() {
822        for row in row.iter_mut() {
823            assert!(
824                column_count >= row.values.len(),
825                "column_count: {}, row.values.len(): {}",
826                column_count,
827                row.values.len()
828            );
829            let diff = column_count - row.values.len();
830            for _ in 0..diff {
831                row.values.push(GreptimeValue { value_data: None });
832            }
833        }
834    }
835
836    Ok((schema_info, opt_map))
837}
838
839/// Identity pipeline for Greptime
840/// This pipeline will convert the input JSON array to Greptime Rows
841/// params table is used to set the semantic type of the row key column to Tag
842/// 1. The pipeline will add a default timestamp column to the schema
843/// 2. The pipeline not resolve NULL value
844/// 3. The pipeline assumes that the json format is fixed
845/// 4. The pipeline will return an error if the same column datatype is mismatched
846/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
847pub fn identity_pipeline(
848    array: Vec<VrlValue>,
849    table: Option<Arc<table::Table>>,
850    pipeline_ctx: &PipelineContext<'_>,
851) -> Result<HashMap<ContextOpt, Rows>> {
852    let skip_error = pipeline_ctx.pipeline_param.skip_error();
853    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
854    // Always flatten JSON objects and stringify arrays
855    let mut input = Vec::with_capacity(array.len());
856    for item in array.into_iter() {
857        let result =
858            unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
859        input.push(result);
860    }
861
862    identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
863        if let Some(table) = table {
864            let table_info = table.table_info();
865            for tag_name in table_info.meta.row_key_column_names() {
866                if let Some(index) = schema.index.get(tag_name) {
867                    schema.schema[*index].semantic_type = SemanticType::Tag;
868                }
869            }
870        }
871
872        let column_schemas = schema.column_schemas()?;
873        Ok(opt_map
874            .into_iter()
875            .map(|(opt, rows)| {
876                (
877                    opt,
878                    Rows {
879                        schema: column_schemas.clone(),
880                        rows,
881                    },
882                )
883            })
884            .collect::<HashMap<ContextOpt, Rows>>())
885    })
886}
887
888/// Consumes the JSON object and consumes it into a single-level object.
889///
890/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
891/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
892/// string and stored at the current flattened key.
893pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
894    let mut flattened = BTreeMap::new();
895    let object = object.into_object().context(ValueMustBeMapSnafu)?;
896
897    if !object.is_empty() {
898        // it will use recursion to flatten the object.
899        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
900    }
901
902    Ok(VrlValue::Object(flattened))
903}
904
905fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
906    match value {
907        VrlValue::Null => serde_json_crate::Value::Null,
908        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
909        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
910        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
911            .map(serde_json_crate::Value::Number)
912            .unwrap_or(serde_json_crate::Value::Null),
913        VrlValue::Bytes(bytes) => {
914            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
915        }
916        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
917        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
918        VrlValue::Array(arr) => {
919            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
920        }
921        VrlValue::Object(map) => serde_json_crate::Value::Object(
922            map.iter()
923                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
924                .collect(),
925        ),
926    }
927}
928
929fn do_flatten_object(
930    dest: &mut BTreeMap<KeyString, VrlValue>,
931    base: Option<&str>,
932    object: BTreeMap<KeyString, VrlValue>,
933    current_level: usize,
934    max_nested_levels: usize,
935) {
936    for (key, value) in object {
937        let new_key = base.map_or_else(
938            || key.clone(),
939            |base_key| format!("{base_key}.{key}").into(),
940        );
941
942        match value {
943            VrlValue::Object(object) => {
944                if current_level >= max_nested_levels {
945                    // Reached the maximum level; stringify the remaining object.
946                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
947                        &VrlValue::Object(object),
948                    ))
949                    .unwrap_or_else(|_| String::from("{}"));
950                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
951                } else {
952                    do_flatten_object(
953                        dest,
954                        Some(&new_key),
955                        object,
956                        current_level + 1,
957                        max_nested_levels,
958                    );
959                }
960            }
961            // Arrays are stringified to ensure no JSON column types in the result.
962            VrlValue::Array(_) => {
963                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
964                    .unwrap_or_else(|_| String::from("[]"));
965                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
966            }
967            // Other leaf types are inserted as-is.
968            _ => {
969                dest.insert(new_key, value);
970            }
971        }
972    }
973}
974
975#[cfg(test)]
976mod tests {
977    use api::v1::SemanticType;
978
979    use super::*;
980    use crate::{PipelineDefinition, identity_pipeline};
981
982    #[test]
983    fn test_identify_pipeline() {
984        let params = GreptimePipelineParams::default();
985        let pipeline_ctx = PipelineContext::new(
986            &PipelineDefinition::GreptimeIdentityPipeline(None),
987            &params,
988            Channel::Unknown,
989        );
990        {
991            let array = [
992                serde_json::json!({
993                    "woshinull": null,
994                    "name": "Alice",
995                    "age": 20,
996                    "is_student": true,
997                    "score": 99.5,
998                    "hobbies": "reading",
999                    "address": "Beijing",
1000                }),
1001                serde_json::json!({
1002                    "name": "Bob",
1003                    "age": 21,
1004                    "is_student": false,
1005                    "score": "88.5",
1006                    "hobbies": "swimming",
1007                    "address": "Shanghai",
1008                    "gaga": "gaga"
1009                }),
1010            ];
1011            let array = array.iter().map(|v| v.into()).collect();
1012            let rows = identity_pipeline(array, None, &pipeline_ctx);
1013            assert!(rows.is_err());
1014            assert_eq!(
1015                rows.err().unwrap().to_string(),
1016                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1017            );
1018        }
1019        {
1020            let array = [
1021                serde_json::json!({
1022                    "woshinull": null,
1023                    "name": "Alice",
1024                    "age": 20,
1025                    "is_student": true,
1026                    "score": 99.5,
1027                    "hobbies": "reading",
1028                    "address": "Beijing",
1029                }),
1030                serde_json::json!({
1031                    "name": "Bob",
1032                    "age": 21,
1033                    "is_student": false,
1034                    "score": 88,
1035                    "hobbies": "swimming",
1036                    "address": "Shanghai",
1037                    "gaga": "gaga"
1038                }),
1039            ];
1040            let array = array.iter().map(|v| v.into()).collect();
1041            let rows = identity_pipeline(array, None, &pipeline_ctx);
1042            assert!(rows.is_err());
1043            assert_eq!(
1044                rows.err().unwrap().to_string(),
1045                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1046            );
1047        }
1048        {
1049            let array = [
1050                serde_json::json!({
1051                    "woshinull": null,
1052                    "name": "Alice",
1053                    "age": 20,
1054                    "is_student": true,
1055                    "score": 99.5,
1056                    "hobbies": "reading",
1057                    "address": "Beijing",
1058                }),
1059                serde_json::json!({
1060                    "name": "Bob",
1061                    "age": 21,
1062                    "is_student": false,
1063                    "score": 88.5,
1064                    "hobbies": "swimming",
1065                    "address": "Shanghai",
1066                    "gaga": "gaga"
1067                }),
1068            ];
1069            let array = array.iter().map(|v| v.into()).collect();
1070            let rows = identity_pipeline(array, None, &pipeline_ctx);
1071            assert!(rows.is_ok());
1072            let mut rows = rows.unwrap();
1073            assert!(rows.len() == 1);
1074            let rows = rows.remove(&ContextOpt::default()).unwrap();
1075            assert_eq!(rows.schema.len(), 8);
1076            assert_eq!(rows.rows.len(), 2);
1077            assert_eq!(8, rows.rows[0].values.len());
1078            assert_eq!(8, rows.rows[1].values.len());
1079        }
1080        {
1081            let array = [
1082                serde_json::json!({
1083                    "woshinull": null,
1084                    "name": "Alice",
1085                    "age": 20,
1086                    "is_student": true,
1087                    "score": 99.5,
1088                    "hobbies": "reading",
1089                    "address": "Beijing",
1090                }),
1091                serde_json::json!({
1092                    "name": "Bob",
1093                    "age": 21,
1094                    "is_student": false,
1095                    "score": 88.5,
1096                    "hobbies": "swimming",
1097                    "address": "Shanghai",
1098                    "gaga": "gaga"
1099                }),
1100            ];
1101            let tag_column_names = ["name".to_string(), "address".to_string()];
1102
1103            let rows =
1104                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
1105                    .map(|(mut schema, mut rows)| {
1106                        for name in tag_column_names {
1107                            if let Some(index) = schema.index.get(&name) {
1108                                schema.schema[*index].semantic_type = SemanticType::Tag;
1109                            }
1110                        }
1111
1112                        assert!(rows.len() == 1);
1113                        let rows = rows.remove(&ContextOpt::default()).unwrap();
1114
1115                        Rows {
1116                            schema: schema.column_schemas().unwrap(),
1117                            rows,
1118                        }
1119                    });
1120
1121            assert!(rows.is_ok());
1122            let rows = rows.unwrap();
1123            assert_eq!(rows.schema.len(), 8);
1124            assert_eq!(rows.rows.len(), 2);
1125            assert_eq!(8, rows.rows[0].values.len());
1126            assert_eq!(8, rows.rows[1].values.len());
1127            assert_eq!(
1128                rows.schema
1129                    .iter()
1130                    .find(|x| x.column_name == "name")
1131                    .unwrap()
1132                    .semantic_type,
1133                SemanticType::Tag as i32
1134            );
1135            assert_eq!(
1136                rows.schema
1137                    .iter()
1138                    .find(|x| x.column_name == "address")
1139                    .unwrap()
1140                    .semantic_type,
1141                SemanticType::Tag as i32
1142            );
1143            assert_eq!(
1144                rows.schema
1145                    .iter()
1146                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1147                    .count(),
1148                2
1149            );
1150        }
1151    }
1152
1153    #[test]
1154    fn test_flatten() {
1155        let test_cases = vec![
1156            // Basic case.
1157            (
1158                serde_json::json!(
1159                    {
1160                        "a": {
1161                            "b": {
1162                                "c": [1, 2, 3]
1163                            }
1164                        },
1165                        "d": [
1166                            "foo",
1167                            "bar"
1168                        ],
1169                        "e": {
1170                            "f": [7, 8, 9],
1171                            "g": {
1172                                "h": 123,
1173                                "i": "hello",
1174                                "j": {
1175                                    "k": true
1176                                }
1177                            }
1178                        }
1179                    }
1180                ),
1181                10,
1182                Some(serde_json::json!(
1183                    {
1184                        "a.b.c": "[1,2,3]",
1185                        "d": "[\"foo\",\"bar\"]",
1186                        "e.f": "[7,8,9]",
1187                        "e.g.h": 123,
1188                        "e.g.i": "hello",
1189                        "e.g.j.k": true
1190                    }
1191                )),
1192            ),
1193            // Test the case where the object has more than 3 nested levels.
1194            (
1195                serde_json::json!(
1196                    {
1197                        "a": {
1198                            "b": {
1199                                "c": {
1200                                    "d": [1, 2, 3]
1201                                }
1202                            }
1203                        },
1204                        "e": [
1205                            "foo",
1206                            "bar"
1207                        ]
1208                    }
1209                ),
1210                3,
1211                Some(serde_json::json!(
1212                    {
1213                        "a.b.c": "{\"d\":[1,2,3]}",
1214                        "e": "[\"foo\",\"bar\"]"
1215                    }
1216                )),
1217            ),
1218        ];
1219
1220        for (input, max_depth, expected) in test_cases {
1221            let input = input.into();
1222            let expected = expected.map(|e| e.into());
1223
1224            let flattened_object = flatten_object(input, max_depth).ok();
1225            assert_eq!(flattened_object, expected);
1226        }
1227    }
1228
1229    use ahash::HashMap as AHashMap;
1230    #[test]
1231    fn test_values_to_rows_skip_error_handling() {
1232        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1233
1234        // Case 1: skip_error=true, mixed valid/invalid elements
1235        {
1236            let schema_info = &mut SchemaInfo::default();
1237            let input_array = vec![
1238                // Valid object
1239                serde_json::json!({"name": "Alice", "age": 25}).into(),
1240                // Invalid element (string)
1241                VrlValue::Bytes("invalid_string".into()),
1242                // Valid object
1243                serde_json::json!({"name": "Bob", "age": 30}).into(),
1244                // Invalid element (number)
1245                VrlValue::Integer(42),
1246                // Valid object
1247                serde_json::json!({"name": "Charlie", "age": 35}).into(),
1248            ];
1249
1250            let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1251                "skip_error".to_string(),
1252                "true".to_string(),
1253            )]));
1254
1255            let pipeline_ctx = PipelineContext::new(
1256                &PipelineDefinition::GreptimeIdentityPipeline(None),
1257                &params,
1258                Channel::Unknown,
1259            );
1260
1261            let result = values_to_rows(
1262                schema_info,
1263                VrlValue::Array(input_array),
1264                &pipeline_ctx,
1265                None,
1266                true,
1267                table_suffix_template.as_ref(),
1268            );
1269
1270            // Should succeed and only process valid objects
1271            assert!(result.is_ok());
1272            let rows_by_context = result.unwrap();
1273            // Count total rows across all ContextOpt groups
1274            let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1275            assert_eq!(total_rows, 3); // Only 3 valid objects
1276        }
1277
1278        // Case 2: skip_error=false, invalid elements present
1279        {
1280            let schema_info = &mut SchemaInfo::default();
1281            let input_array = vec![
1282                serde_json::json!({"name": "Alice", "age": 25}).into(),
1283                VrlValue::Bytes("invalid_string".into()), // This should cause error
1284            ];
1285
1286            let params = GreptimePipelineParams::default(); // skip_error = false
1287
1288            let pipeline_ctx = PipelineContext::new(
1289                &PipelineDefinition::GreptimeIdentityPipeline(None),
1290                &params,
1291                Channel::Unknown,
1292            );
1293
1294            let result = values_to_rows(
1295                schema_info,
1296                VrlValue::Array(input_array),
1297                &pipeline_ctx,
1298                None,
1299                true,
1300                table_suffix_template.as_ref(),
1301            );
1302
1303            // Should fail with ArrayElementMustBeObject error
1304            assert!(result.is_err());
1305            let error_msg = result.unwrap_err().to_string();
1306            assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1307        }
1308    }
1309
1310    /// Test that values_to_rows correctly groups rows by per-element ContextOpt
1311    #[test]
1312    fn test_values_to_rows_per_element_context_opt() {
1313        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1314        let schema_info = &mut SchemaInfo::default();
1315
1316        // Create array with elements having different TTL values (ContextOpt)
1317        let input_array = vec![
1318            serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1319            serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1320            serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1321        ];
1322
1323        let params = GreptimePipelineParams::default();
1324        let pipeline_ctx = PipelineContext::new(
1325            &PipelineDefinition::GreptimeIdentityPipeline(None),
1326            &params,
1327            Channel::Unknown,
1328        );
1329
1330        let result = values_to_rows(
1331            schema_info,
1332            VrlValue::Array(input_array),
1333            &pipeline_ctx,
1334            None,
1335            true,
1336            table_suffix_template.as_ref(),
1337        );
1338
1339        assert!(result.is_ok());
1340        let rows_by_context = result.unwrap();
1341
1342        // Should have 2 different ContextOpt groups (1h TTL and 24h TTL)
1343        assert_eq!(rows_by_context.len(), 2);
1344
1345        // Count rows per group
1346        let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1347        assert_eq!(total_rows, 3);
1348
1349        // Verify that rows are correctly grouped by TTL
1350        let mut ttl_1h_count = 0;
1351        let mut ttl_24h_count = 0;
1352        for rows in rows_by_context.values() {
1353            // ContextOpt doesn't expose ttl directly, but we can count by group size
1354            if rows.len() == 2 {
1355                ttl_1h_count = rows.len();
1356            } else if rows.len() == 1 {
1357                ttl_24h_count = rows.len();
1358            }
1359        }
1360        assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
1361        assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
1362    }
1363}