pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46    ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60/// Row with potentially designated table suffix.
61pub type RowWithTableSuffix = (Row, Option<String>);
62
63/// fields not in the columns will be discarded
64/// to prevent automatic column creation in GreptimeDB
65#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67    transforms: Transforms,
68    schema: Vec<ColumnSchema>,
69}
70
71/// Parameters that can be used to configure the greptime pipelines.
72#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74    /// The original options for configuring the greptime pipelines.
75    /// This should not be used directly, instead, use the parsed shortcut option values.
76    options: HashMap<String, String>,
77
78    /// Whether to skip error when processing the pipeline.
79    pub skip_error: OnceCell<bool>,
80    /// Max nested levels when flattening JSON object. Defaults to
81    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
82    pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
87    /// The params is in the format of `key1=value1&key2=value2`,for example:
88    /// x-greptime-pipeline-params: max_nested_levels=5
89    pub fn from_params(params: Option<&str>) -> Self {
90        let options = Self::parse_header_str_to_map(params);
91
92        Self {
93            options,
94            skip_error: OnceCell::new(),
95            max_nested_levels: OnceCell::new(),
96        }
97    }
98
99    pub fn from_map(options: HashMap<String, String>) -> Self {
100        Self {
101            options,
102            skip_error: OnceCell::new(),
103            max_nested_levels: OnceCell::new(),
104        }
105    }
106
107    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108        if let Some(params) = params {
109            if params.is_empty() {
110                HashMap::new()
111            } else {
112                params
113                    .split('&')
114                    .filter_map(|s| s.split_once('='))
115                    .map(|(k, v)| (k.to_string(), v.to_string()))
116                    .collect::<HashMap<String, String>>()
117            }
118        } else {
119            HashMap::new()
120        }
121    }
122
123    /// Whether to skip error when processing the pipeline.
124    pub fn skip_error(&self) -> bool {
125        *self
126            .skip_error
127            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128    }
129
130    /// Max nested levels for JSON flattening. If not provided or invalid,
131    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
132    pub fn max_nested_levels(&self) -> usize {
133        *self.max_nested_levels.get_or_init(|| {
134            self.options
135                .get("max_nested_levels")
136                .and_then(|s| s.parse::<usize>().ok())
137                .filter(|v| *v > 0)
138                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139        })
140    }
141}
142
143impl GreptimeTransformer {
144    /// Add a default timestamp column to the transforms
145    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146        let type_ = ColumnDataType::TimestampNanosecond;
147        let default = None;
148
149        let transform = Transform {
150            fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151            type_,
152            default,
153            index: Some(Index::Time),
154            on_failure: Some(crate::etl::transform::OnFailure::Default),
155            tag: false,
156        };
157        transforms.push(transform);
158    }
159
160    /// Generate the schema for the GreptimeTransformer
161    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
162        let mut schema = vec![];
163        for transform in transforms.iter() {
164            schema.extend(coerce_columns(transform)?);
165        }
166        Ok(schema)
167    }
168}
169
170impl GreptimeTransformer {
171    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
172        // empty check is done in the caller
173        let mut column_names_set = HashSet::new();
174        let mut timestamp_columns = vec![];
175
176        for transform in transforms.iter() {
177            let target_fields_set = transform
178                .fields
179                .iter()
180                .map(|f| f.target_or_input_field())
181                .collect::<HashSet<_>>();
182
183            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
184            if !intersections.is_empty() {
185                let duplicates = intersections.iter().join(",");
186                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
187            }
188
189            column_names_set.extend(target_fields_set);
190
191            if let Some(idx) = transform.index
192                && idx == Index::Time
193            {
194                match transform.fields.len() {
195                    //Safety unwrap is fine here because we have checked the length of real_fields
196                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
197                    _ => {
198                        return TransformMultipleTimestampIndexSnafu {
199                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
200                        }
201                        .fail();
202                    }
203                }
204            }
205        }
206
207        let schema = match timestamp_columns.len() {
208            0 if doc_version == &PipelineDocVersion::V1 => {
209                // compatible with v1, add a default timestamp column
210                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
211                GreptimeTransformer::init_schemas(&transforms)?
212            }
213            1 => GreptimeTransformer::init_schemas(&transforms)?,
214            count => {
215                let columns = timestamp_columns.iter().join(", ");
216                return TransformTimestampIndexCountSnafu { count, columns }.fail();
217            }
218        };
219        Ok(GreptimeTransformer { transforms, schema })
220    }
221
222    pub fn transform_mut(
223        &self,
224        pipeline_map: &mut VrlValue,
225        is_v1: bool,
226    ) -> Result<Vec<GreptimeValue>> {
227        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
228        let mut output_index = 0;
229        for transform in self.transforms.iter() {
230            for field in transform.fields.iter() {
231                let column_name = field.input_field();
232
233                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
234                // let keep us `get` here to be compatible with v1
235                match pipeline_map.get(column_name) {
236                    Some(v) => {
237                        let value_data = coerce_value(v, transform)?;
238                        // every transform fields has only one output field
239                        values[output_index] = GreptimeValue { value_data };
240                    }
241                    None => {
242                        let value_data = match transform.on_failure {
243                            Some(crate::etl::transform::OnFailure::Default) => {
244                                match transform.get_default() {
245                                    Some(default) => Some(default.clone()),
246                                    None => transform.get_default_value_when_data_is_none(),
247                                }
248                            }
249                            Some(crate::etl::transform::OnFailure::Ignore) => None,
250                            None => None,
251                        };
252                        if transform.is_timeindex() && value_data.is_none() {
253                            return TimeIndexMustBeNonNullSnafu.fail();
254                        }
255                        values[output_index] = GreptimeValue { value_data };
256                    }
257                }
258                output_index += 1;
259                if !is_v1 {
260                    // remove the column from the pipeline_map
261                    // so that the auto-transform can use the rest fields
262                    pipeline_map.remove(column_name);
263                }
264            }
265        }
266        Ok(values)
267    }
268
269    pub fn transforms(&self) -> &Transforms {
270        &self.transforms
271    }
272
273    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
274        &self.schema
275    }
276
277    pub fn transforms_mut(&mut self) -> &mut Transforms {
278        &mut self.transforms
279    }
280}
281
282#[derive(Clone)]
283pub struct ColumnMetadata {
284    column_schema: datatypes::schema::ColumnSchema,
285    semantic_type: SemanticType,
286}
287
288impl From<ColumnSchema> for ColumnMetadata {
289    fn from(value: ColumnSchema) -> Self {
290        let datatype = value.datatype();
291        let semantic_type = value.semantic_type();
292        let ColumnSchema {
293            column_name,
294            datatype: _,
295            semantic_type: _,
296            datatype_extension,
297            options,
298        } = value;
299
300        let column_schema = datatypes::schema::ColumnSchema::new(
301            column_name,
302            ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
303            semantic_type != SemanticType::Timestamp,
304        );
305
306        let metadata = collect_column_options(options.as_ref());
307        let column_schema = column_schema.with_metadata(metadata);
308
309        Self {
310            column_schema,
311            semantic_type,
312        }
313    }
314}
315
316impl TryFrom<ColumnMetadata> for ColumnSchema {
317    type Error = api::error::Error;
318
319    fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
320        let ColumnMetadata {
321            column_schema,
322            semantic_type,
323        } = value;
324
325        let options = options_from_column_schema(&column_schema);
326
327        let (datatype, datatype_extension) =
328            ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
329
330        Ok(ColumnSchema {
331            column_name: column_schema.name,
332            datatype: datatype as _,
333            semantic_type: semantic_type as _,
334            datatype_extension,
335            options,
336        })
337    }
338}
339
340/// This is used to record the current state schema information and a sequential cache of field names.
341/// As you traverse the user input JSON, this will change.
342/// It will record a superset of all user input schemas.
343#[derive(Default)]
344pub struct SchemaInfo {
345    /// schema info
346    pub schema: Vec<ColumnMetadata>,
347    /// index of the column name
348    pub index: HashMap<String, usize>,
349    /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
350    table: Option<Arc<Table>>,
351}
352
353impl SchemaInfo {
354    pub fn with_capacity(capacity: usize) -> Self {
355        Self {
356            schema: Vec::with_capacity(capacity),
357            index: HashMap::with_capacity(capacity),
358            table: None,
359        }
360    }
361
362    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
363        let mut index = HashMap::new();
364        for (i, schema) in schema_list.iter().enumerate() {
365            index.insert(schema.column_name.clone(), i);
366        }
367        Self {
368            schema: schema_list.into_iter().map(Into::into).collect(),
369            index,
370            table: None,
371        }
372    }
373
374    pub fn set_table(&mut self, table: Option<Arc<Table>>) {
375        self.table = table;
376    }
377
378    fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
379        if let Some(table) = &self.table
380            && let Some(i) = table.schema_ref().column_index_by_name(column_name)
381        {
382            let column_schema = table.schema_ref().column_schemas()[i].clone();
383
384            let semantic_type = if column_schema.is_time_index() {
385                SemanticType::Timestamp
386            } else if table.table_info().meta.primary_key_indices.contains(&i) {
387                SemanticType::Tag
388            } else {
389                SemanticType::Field
390            };
391
392            Some(ColumnMetadata {
393                column_schema,
394                semantic_type,
395            })
396        } else {
397            None
398        }
399    }
400
401    pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
402        self.schema
403            .iter()
404            .map(|x| x.clone().try_into())
405            .collect::<api::error::Result<Vec<_>>>()
406    }
407}
408
409fn resolve_schema(
410    index: Option<usize>,
411    pipeline_context: &PipelineContext,
412    column: &str,
413    value_type: &ConcreteDataType,
414    schema_info: &mut SchemaInfo,
415) -> Result<()> {
416    if let Some(index) = index {
417        let column_type = &mut schema_info.schema[index].column_schema.data_type;
418        match (column_type, value_type) {
419            (column_type, value_type) if column_type == value_type => Ok(()),
420            (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
421                if column_type.is_include(value_type) =>
422            {
423                Ok(())
424            }
425            (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
426                column,
427                expected: column_type.to_string(),
428                actual: value_type.to_string(),
429            }
430            .fail(),
431        }
432    } else {
433        let column_schema = schema_info
434            .find_column_schema_in_table(column)
435            .unwrap_or_else(|| {
436                let semantic_type = decide_semantic(pipeline_context, column);
437                let column_schema = datatypes::schema::ColumnSchema::new(
438                    column,
439                    value_type.clone(),
440                    semantic_type != SemanticType::Timestamp,
441                );
442                ColumnMetadata {
443                    column_schema,
444                    semantic_type,
445                }
446            });
447        let key = column.to_string();
448        schema_info.schema.push(column_schema);
449        schema_info.index.insert(key, schema_info.schema.len() - 1);
450        Ok(())
451    }
452}
453
454fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
455    match p_ctx.channel {
456        Channel::Prometheus => {
457            let ts = values
458                .as_object()
459                .and_then(|m| m.get(greptime_timestamp()))
460                .and_then(|ts| ts.try_into_i64().ok())
461                .unwrap_or_default();
462            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
463        }
464        _ => {
465            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
466            match custom_ts {
467                Some(ts) => {
468                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
469                    Some(ts.get_timestamp_value(ts_field)).transpose()
470                }
471                None => Ok(Some(ValueData::TimestampNanosecondValue(
472                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
473                ))),
474            }
475        }
476    }
477}
478
479/// Converts VRL values to Greptime rows grouped by their ContextOpt.
480/// # Returns
481/// A HashMap where keys are `ContextOpt` and values are vectors of (row, table_suffix) pairs.
482/// Single object input produces one ContextOpt group with one row.
483/// Array input groups rows by their per-element ContextOpt values.
484///
485/// # Errors
486/// - `ArrayElementMustBeObject` if an array element is not an object
487pub(crate) fn values_to_rows(
488    schema_info: &mut SchemaInfo,
489    mut values: VrlValue,
490    pipeline_ctx: &PipelineContext<'_>,
491    row: Option<Vec<GreptimeValue>>,
492    need_calc_ts: bool,
493    tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
494) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
495    let skip_error = pipeline_ctx.pipeline_param.skip_error();
496    let VrlValue::Array(arr) = values else {
497        // Single object: extract ContextOpt and table_suffix
498        let mut result = std::collections::HashMap::new();
499
500        let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
501            Ok(r) => r,
502            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
503        };
504
505        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
506        let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
507            Ok(r) => r,
508            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
509        };
510        result.insert(opt, vec![(row, table_suffix)]);
511        return Ok(result);
512    };
513
514    let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
515        std::collections::HashMap::new();
516    for (index, mut value) in arr.into_iter().enumerate() {
517        if !value.is_object() {
518            unwrap_or_continue_if_err!(
519                ArrayElementMustBeObjectSnafu {
520                    index,
521                    actual_type: value.kind_str().to_string(),
522                }
523                .fail(),
524                skip_error
525            );
526        }
527
528        // Extract ContextOpt and table_suffix for this element
529        let mut opt = unwrap_or_continue_if_err!(
530            ContextOpt::from_pipeline_map_to_opt(&mut value),
531            skip_error
532        );
533        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
534        let transformed_row = unwrap_or_continue_if_err!(
535            values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
536            skip_error
537        );
538        rows_by_context
539            .entry(opt)
540            .or_default()
541            .push((transformed_row, table_suffix));
542    }
543    Ok(rows_by_context)
544}
545
546/// `need_calc_ts` happens in two cases:
547/// 1. full greptime_identity
548/// 2. auto-transform without transformer
549///
550/// if transform is present in custom pipeline in v2 mode
551/// we dont need to calc ts again, nor do we need to check ts column name
552pub(crate) fn values_to_row(
553    schema_info: &mut SchemaInfo,
554    values: VrlValue,
555    pipeline_ctx: &PipelineContext<'_>,
556    row: Option<Vec<GreptimeValue>>,
557    need_calc_ts: bool,
558) -> Result<Row> {
559    let mut row: Vec<GreptimeValue> =
560        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
561    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563    if need_calc_ts {
564        // calculate timestamp value based on the channel
565        let ts = calc_ts(pipeline_ctx, &values)?;
566        row.push(GreptimeValue { value_data: ts });
567    }
568
569    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
570
571    // skip ts column
572    let ts_column_name = custom_ts
573        .as_ref()
574        .map_or(greptime_timestamp(), |ts| ts.get_column_name());
575
576    let values = values.into_object().context(ValueMustBeMapSnafu)?;
577
578    for (column_name, value) in values {
579        if need_calc_ts && column_name.as_str() == ts_column_name {
580            continue;
581        }
582
583        resolve_value(
584            value,
585            column_name.into(),
586            &mut row,
587            schema_info,
588            pipeline_ctx,
589        )?;
590    }
591    Ok(Row { values: row })
592}
593
594fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
595    if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
596        SemanticType::Tag
597    } else {
598        SemanticType::Field
599    }
600}
601
602fn resolve_value(
603    value: VrlValue,
604    column_name: String,
605    row: &mut Vec<GreptimeValue>,
606    schema_info: &mut SchemaInfo,
607    p_ctx: &PipelineContext,
608) -> Result<()> {
609    let index = schema_info.index.get(&column_name).copied();
610
611    let value_data = match value {
612        VrlValue::Null => None,
613
614        VrlValue::Integer(v) => {
615            // safe unwrap after type matched
616            resolve_schema(
617                index,
618                p_ctx,
619                &column_name,
620                &ConcreteDataType::int64_datatype(),
621                schema_info,
622            )?;
623            Some(ValueData::I64Value(v))
624        }
625
626        VrlValue::Float(v) => {
627            // safe unwrap after type matched
628            resolve_schema(
629                index,
630                p_ctx,
631                &column_name,
632                &ConcreteDataType::float64_datatype(),
633                schema_info,
634            )?;
635            Some(ValueData::F64Value(v.into()))
636        }
637
638        VrlValue::Boolean(v) => {
639            resolve_schema(
640                index,
641                p_ctx,
642                &column_name,
643                &ConcreteDataType::boolean_datatype(),
644                schema_info,
645            )?;
646            Some(ValueData::BoolValue(v))
647        }
648
649        VrlValue::Bytes(v) => {
650            resolve_schema(
651                index,
652                p_ctx,
653                &column_name,
654                &ConcreteDataType::string_datatype(),
655                schema_info,
656            )?;
657            Some(ValueData::StringValue(String::from_utf8_lossy_owned(
658                v.to_vec(),
659            )))
660        }
661
662        VrlValue::Regex(v) => {
663            warn!(
664                "Persisting regex value in the table, this should not happen, column_name: {}",
665                column_name
666            );
667            resolve_schema(
668                index,
669                p_ctx,
670                &column_name,
671                &ConcreteDataType::string_datatype(),
672                schema_info,
673            )?;
674            Some(ValueData::StringValue(v.to_string()))
675        }
676
677        VrlValue::Timestamp(ts) => {
678            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
679                input: ts.to_rfc3339(),
680            })?;
681            resolve_schema(
682                index,
683                p_ctx,
684                &column_name,
685                &ConcreteDataType::timestamp_nanosecond_datatype(),
686                schema_info,
687            )?;
688            Some(ValueData::TimestampNanosecondValue(ns))
689        }
690
691        VrlValue::Array(_) | VrlValue::Object(_) => {
692            let is_json_native_type = schema_info
693                .find_column_schema_in_table(&column_name)
694                .is_some_and(|x| {
695                    if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
696                        column_type.is_native_type()
697                    } else {
698                        false
699                    }
700                });
701
702            let value = if is_json_native_type {
703                let json_extension_type: Option<JsonExtensionType> =
704                    if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705                        x.column_schema.extension_type()?
706                    } else {
707                        None
708                    };
709                let settings = json_extension_type
710                    .and_then(|x| x.metadata().json_structure_settings.clone())
711                    .unwrap_or_default();
712                let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713                    CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714                })?;
715                let value = settings.encode(value)?;
716
717                resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719                let Value::Json(value) = value else {
720                    unreachable!()
721                };
722                ValueData::JsonValue(encode_json_value(*value))
723            } else {
724                resolve_schema(
725                    index,
726                    p_ctx,
727                    &column_name,
728                    &ConcreteDataType::binary_datatype(),
729                    schema_info,
730                )?;
731
732                let value = vrl_value_to_jsonb_value(&value);
733                ValueData::BinaryValue(value.to_vec())
734            };
735            Some(value)
736        }
737    };
738
739    let value = GreptimeValue { value_data };
740    if let Some(index) = index {
741        row[index] = value;
742    } else {
743        row.push(value);
744    }
745    Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749    match value {
750        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756        VrlValue::Object(btree_map) => jsonb::Value::Object(
757            btree_map
758                .iter()
759                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760                .collect(),
761        ),
762        VrlValue::Array(values) => jsonb::Value::Array(
763            values
764                .iter()
765                .map(|value| vrl_value_to_jsonb_value(value))
766                .collect(),
767        ),
768        VrlValue::Null => jsonb::Value::Null,
769    }
770}
771
772fn identity_pipeline_inner(
773    pipeline_maps: Vec<VrlValue>,
774    pipeline_ctx: &PipelineContext<'_>,
775) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
776    let skip_error = pipeline_ctx.pipeline_param.skip_error();
777    let mut schema_info = SchemaInfo::default();
778    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
779
780    // set time index column schema first
781    let column_schema = datatypes::schema::ColumnSchema::new(
782        custom_ts
783            .map(|ts| ts.get_column_name().to_string())
784            .unwrap_or_else(|| greptime_timestamp().to_string()),
785        custom_ts
786            .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
787            .unwrap_or_else(|| {
788                if pipeline_ctx.channel == Channel::Prometheus {
789                    ConcreteDataType::timestamp_millisecond_datatype()
790                } else {
791                    ConcreteDataType::timestamp_nanosecond_datatype()
792                }
793            }),
794        false,
795    );
796    schema_info.schema.push(ColumnMetadata {
797        column_schema,
798        semantic_type: SemanticType::Timestamp,
799    });
800
801    let mut opt_map = HashMap::new();
802    let len = pipeline_maps.len();
803
804    for mut pipeline_map in pipeline_maps {
805        let opt = unwrap_or_continue_if_err!(
806            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
807            skip_error
808        );
809        let row = unwrap_or_continue_if_err!(
810            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
811            skip_error
812        );
813
814        opt_map
815            .entry(opt)
816            .or_insert_with(|| Vec::with_capacity(len))
817            .push(row);
818    }
819
820    let column_count = schema_info.schema.len();
821    for (_, row) in opt_map.iter_mut() {
822        for row in row.iter_mut() {
823            let diff = column_count - row.values.len();
824            for _ in 0..diff {
825                row.values.push(GreptimeValue { value_data: None });
826            }
827        }
828    }
829
830    Ok((schema_info, opt_map))
831}
832
833/// Identity pipeline for Greptime
834/// This pipeline will convert the input JSON array to Greptime Rows
835/// params table is used to set the semantic type of the row key column to Tag
836/// 1. The pipeline will add a default timestamp column to the schema
837/// 2. The pipeline not resolve NULL value
838/// 3. The pipeline assumes that the json format is fixed
839/// 4. The pipeline will return an error if the same column datatype is mismatched
840/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
841pub fn identity_pipeline(
842    array: Vec<VrlValue>,
843    table: Option<Arc<table::Table>>,
844    pipeline_ctx: &PipelineContext<'_>,
845) -> Result<HashMap<ContextOpt, Rows>> {
846    let skip_error = pipeline_ctx.pipeline_param.skip_error();
847    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
848    // Always flatten JSON objects and stringify arrays
849    let mut input = Vec::with_capacity(array.len());
850    for item in array.into_iter() {
851        let result =
852            unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
853        input.push(result);
854    }
855
856    identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
857        if let Some(table) = table {
858            let table_info = table.table_info();
859            for tag_name in table_info.meta.row_key_column_names() {
860                if let Some(index) = schema.index.get(tag_name) {
861                    schema.schema[*index].semantic_type = SemanticType::Tag;
862                }
863            }
864        }
865
866        let column_schemas = schema.column_schemas()?;
867        Ok(opt_map
868            .into_iter()
869            .map(|(opt, rows)| {
870                (
871                    opt,
872                    Rows {
873                        schema: column_schemas.clone(),
874                        rows,
875                    },
876                )
877            })
878            .collect::<HashMap<ContextOpt, Rows>>())
879    })
880}
881
882/// Consumes the JSON object and consumes it into a single-level object.
883///
884/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
885/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
886/// string and stored at the current flattened key.
887pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
888    let mut flattened = BTreeMap::new();
889    let object = object.into_object().context(ValueMustBeMapSnafu)?;
890
891    if !object.is_empty() {
892        // it will use recursion to flatten the object.
893        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
894    }
895
896    Ok(VrlValue::Object(flattened))
897}
898
899fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
900    match value {
901        VrlValue::Null => serde_json_crate::Value::Null,
902        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
903        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
904        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
905            .map(serde_json_crate::Value::Number)
906            .unwrap_or(serde_json_crate::Value::Null),
907        VrlValue::Bytes(bytes) => {
908            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
909        }
910        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
911        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
912        VrlValue::Array(arr) => {
913            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
914        }
915        VrlValue::Object(map) => serde_json_crate::Value::Object(
916            map.iter()
917                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
918                .collect(),
919        ),
920    }
921}
922
923fn do_flatten_object(
924    dest: &mut BTreeMap<KeyString, VrlValue>,
925    base: Option<&str>,
926    object: BTreeMap<KeyString, VrlValue>,
927    current_level: usize,
928    max_nested_levels: usize,
929) {
930    for (key, value) in object {
931        let new_key = base.map_or_else(
932            || key.clone(),
933            |base_key| format!("{base_key}.{key}").into(),
934        );
935
936        match value {
937            VrlValue::Object(object) => {
938                if current_level >= max_nested_levels {
939                    // Reached the maximum level; stringify the remaining object.
940                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
941                        &VrlValue::Object(object),
942                    ))
943                    .unwrap_or_else(|_| String::from("{}"));
944                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
945                } else {
946                    do_flatten_object(
947                        dest,
948                        Some(&new_key),
949                        object,
950                        current_level + 1,
951                        max_nested_levels,
952                    );
953                }
954            }
955            // Arrays are stringified to ensure no JSON column types in the result.
956            VrlValue::Array(_) => {
957                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
958                    .unwrap_or_else(|_| String::from("[]"));
959                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
960            }
961            // Other leaf types are inserted as-is.
962            _ => {
963                dest.insert(new_key, value);
964            }
965        }
966    }
967}
968
969#[cfg(test)]
970mod tests {
971    use api::v1::SemanticType;
972
973    use super::*;
974    use crate::{PipelineDefinition, identity_pipeline};
975
976    #[test]
977    fn test_identify_pipeline() {
978        let params = GreptimePipelineParams::default();
979        let pipeline_ctx = PipelineContext::new(
980            &PipelineDefinition::GreptimeIdentityPipeline(None),
981            &params,
982            Channel::Unknown,
983        );
984        {
985            let array = [
986                serde_json::json!({
987                    "woshinull": null,
988                    "name": "Alice",
989                    "age": 20,
990                    "is_student": true,
991                    "score": 99.5,
992                    "hobbies": "reading",
993                    "address": "Beijing",
994                }),
995                serde_json::json!({
996                    "name": "Bob",
997                    "age": 21,
998                    "is_student": false,
999                    "score": "88.5",
1000                    "hobbies": "swimming",
1001                    "address": "Shanghai",
1002                    "gaga": "gaga"
1003                }),
1004            ];
1005            let array = array.iter().map(|v| v.into()).collect();
1006            let rows = identity_pipeline(array, None, &pipeline_ctx);
1007            assert!(rows.is_err());
1008            assert_eq!(
1009                rows.err().unwrap().to_string(),
1010                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1011            );
1012        }
1013        {
1014            let array = [
1015                serde_json::json!({
1016                    "woshinull": null,
1017                    "name": "Alice",
1018                    "age": 20,
1019                    "is_student": true,
1020                    "score": 99.5,
1021                    "hobbies": "reading",
1022                    "address": "Beijing",
1023                }),
1024                serde_json::json!({
1025                    "name": "Bob",
1026                    "age": 21,
1027                    "is_student": false,
1028                    "score": 88,
1029                    "hobbies": "swimming",
1030                    "address": "Shanghai",
1031                    "gaga": "gaga"
1032                }),
1033            ];
1034            let array = array.iter().map(|v| v.into()).collect();
1035            let rows = identity_pipeline(array, None, &pipeline_ctx);
1036            assert!(rows.is_err());
1037            assert_eq!(
1038                rows.err().unwrap().to_string(),
1039                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1040            );
1041        }
1042        {
1043            let array = [
1044                serde_json::json!({
1045                    "woshinull": null,
1046                    "name": "Alice",
1047                    "age": 20,
1048                    "is_student": true,
1049                    "score": 99.5,
1050                    "hobbies": "reading",
1051                    "address": "Beijing",
1052                }),
1053                serde_json::json!({
1054                    "name": "Bob",
1055                    "age": 21,
1056                    "is_student": false,
1057                    "score": 88.5,
1058                    "hobbies": "swimming",
1059                    "address": "Shanghai",
1060                    "gaga": "gaga"
1061                }),
1062            ];
1063            let array = array.iter().map(|v| v.into()).collect();
1064            let rows = identity_pipeline(array, None, &pipeline_ctx);
1065            assert!(rows.is_ok());
1066            let mut rows = rows.unwrap();
1067            assert!(rows.len() == 1);
1068            let rows = rows.remove(&ContextOpt::default()).unwrap();
1069            assert_eq!(rows.schema.len(), 8);
1070            assert_eq!(rows.rows.len(), 2);
1071            assert_eq!(8, rows.rows[0].values.len());
1072            assert_eq!(8, rows.rows[1].values.len());
1073        }
1074        {
1075            let array = [
1076                serde_json::json!({
1077                    "woshinull": null,
1078                    "name": "Alice",
1079                    "age": 20,
1080                    "is_student": true,
1081                    "score": 99.5,
1082                    "hobbies": "reading",
1083                    "address": "Beijing",
1084                }),
1085                serde_json::json!({
1086                    "name": "Bob",
1087                    "age": 21,
1088                    "is_student": false,
1089                    "score": 88.5,
1090                    "hobbies": "swimming",
1091                    "address": "Shanghai",
1092                    "gaga": "gaga"
1093                }),
1094            ];
1095            let tag_column_names = ["name".to_string(), "address".to_string()];
1096
1097            let rows =
1098                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
1099                    .map(|(mut schema, mut rows)| {
1100                        for name in tag_column_names {
1101                            if let Some(index) = schema.index.get(&name) {
1102                                schema.schema[*index].semantic_type = SemanticType::Tag;
1103                            }
1104                        }
1105
1106                        assert!(rows.len() == 1);
1107                        let rows = rows.remove(&ContextOpt::default()).unwrap();
1108
1109                        Rows {
1110                            schema: schema.column_schemas().unwrap(),
1111                            rows,
1112                        }
1113                    });
1114
1115            assert!(rows.is_ok());
1116            let rows = rows.unwrap();
1117            assert_eq!(rows.schema.len(), 8);
1118            assert_eq!(rows.rows.len(), 2);
1119            assert_eq!(8, rows.rows[0].values.len());
1120            assert_eq!(8, rows.rows[1].values.len());
1121            assert_eq!(
1122                rows.schema
1123                    .iter()
1124                    .find(|x| x.column_name == "name")
1125                    .unwrap()
1126                    .semantic_type,
1127                SemanticType::Tag as i32
1128            );
1129            assert_eq!(
1130                rows.schema
1131                    .iter()
1132                    .find(|x| x.column_name == "address")
1133                    .unwrap()
1134                    .semantic_type,
1135                SemanticType::Tag as i32
1136            );
1137            assert_eq!(
1138                rows.schema
1139                    .iter()
1140                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1141                    .count(),
1142                2
1143            );
1144        }
1145    }
1146
1147    #[test]
1148    fn test_flatten() {
1149        let test_cases = vec![
1150            // Basic case.
1151            (
1152                serde_json::json!(
1153                    {
1154                        "a": {
1155                            "b": {
1156                                "c": [1, 2, 3]
1157                            }
1158                        },
1159                        "d": [
1160                            "foo",
1161                            "bar"
1162                        ],
1163                        "e": {
1164                            "f": [7, 8, 9],
1165                            "g": {
1166                                "h": 123,
1167                                "i": "hello",
1168                                "j": {
1169                                    "k": true
1170                                }
1171                            }
1172                        }
1173                    }
1174                ),
1175                10,
1176                Some(serde_json::json!(
1177                    {
1178                        "a.b.c": "[1,2,3]",
1179                        "d": "[\"foo\",\"bar\"]",
1180                        "e.f": "[7,8,9]",
1181                        "e.g.h": 123,
1182                        "e.g.i": "hello",
1183                        "e.g.j.k": true
1184                    }
1185                )),
1186            ),
1187            // Test the case where the object has more than 3 nested levels.
1188            (
1189                serde_json::json!(
1190                    {
1191                        "a": {
1192                            "b": {
1193                                "c": {
1194                                    "d": [1, 2, 3]
1195                                }
1196                            }
1197                        },
1198                        "e": [
1199                            "foo",
1200                            "bar"
1201                        ]
1202                    }
1203                ),
1204                3,
1205                Some(serde_json::json!(
1206                    {
1207                        "a.b.c": "{\"d\":[1,2,3]}",
1208                        "e": "[\"foo\",\"bar\"]"
1209                    }
1210                )),
1211            ),
1212        ];
1213
1214        for (input, max_depth, expected) in test_cases {
1215            let input = input.into();
1216            let expected = expected.map(|e| e.into());
1217
1218            let flattened_object = flatten_object(input, max_depth).ok();
1219            assert_eq!(flattened_object, expected);
1220        }
1221    }
1222
1223    use ahash::HashMap as AHashMap;
1224    #[test]
1225    fn test_values_to_rows_skip_error_handling() {
1226        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1227
1228        // Case 1: skip_error=true, mixed valid/invalid elements
1229        {
1230            let schema_info = &mut SchemaInfo::default();
1231            let input_array = vec![
1232                // Valid object
1233                serde_json::json!({"name": "Alice", "age": 25}).into(),
1234                // Invalid element (string)
1235                VrlValue::Bytes("invalid_string".into()),
1236                // Valid object
1237                serde_json::json!({"name": "Bob", "age": 30}).into(),
1238                // Invalid element (number)
1239                VrlValue::Integer(42),
1240                // Valid object
1241                serde_json::json!({"name": "Charlie", "age": 35}).into(),
1242            ];
1243
1244            let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1245                "skip_error".to_string(),
1246                "true".to_string(),
1247            )]));
1248
1249            let pipeline_ctx = PipelineContext::new(
1250                &PipelineDefinition::GreptimeIdentityPipeline(None),
1251                &params,
1252                Channel::Unknown,
1253            );
1254
1255            let result = values_to_rows(
1256                schema_info,
1257                VrlValue::Array(input_array),
1258                &pipeline_ctx,
1259                None,
1260                true,
1261                table_suffix_template.as_ref(),
1262            );
1263
1264            // Should succeed and only process valid objects
1265            assert!(result.is_ok());
1266            let rows_by_context = result.unwrap();
1267            // Count total rows across all ContextOpt groups
1268            let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1269            assert_eq!(total_rows, 3); // Only 3 valid objects
1270        }
1271
1272        // Case 2: skip_error=false, invalid elements present
1273        {
1274            let schema_info = &mut SchemaInfo::default();
1275            let input_array = vec![
1276                serde_json::json!({"name": "Alice", "age": 25}).into(),
1277                VrlValue::Bytes("invalid_string".into()), // This should cause error
1278            ];
1279
1280            let params = GreptimePipelineParams::default(); // skip_error = false
1281
1282            let pipeline_ctx = PipelineContext::new(
1283                &PipelineDefinition::GreptimeIdentityPipeline(None),
1284                &params,
1285                Channel::Unknown,
1286            );
1287
1288            let result = values_to_rows(
1289                schema_info,
1290                VrlValue::Array(input_array),
1291                &pipeline_ctx,
1292                None,
1293                true,
1294                table_suffix_template.as_ref(),
1295            );
1296
1297            // Should fail with ArrayElementMustBeObject error
1298            assert!(result.is_err());
1299            let error_msg = result.unwrap_err().to_string();
1300            assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1301        }
1302    }
1303
1304    /// Test that values_to_rows correctly groups rows by per-element ContextOpt
1305    #[test]
1306    fn test_values_to_rows_per_element_context_opt() {
1307        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1308        let schema_info = &mut SchemaInfo::default();
1309
1310        // Create array with elements having different TTL values (ContextOpt)
1311        let input_array = vec![
1312            serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1313            serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1314            serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1315        ];
1316
1317        let params = GreptimePipelineParams::default();
1318        let pipeline_ctx = PipelineContext::new(
1319            &PipelineDefinition::GreptimeIdentityPipeline(None),
1320            &params,
1321            Channel::Unknown,
1322        );
1323
1324        let result = values_to_rows(
1325            schema_info,
1326            VrlValue::Array(input_array),
1327            &pipeline_ctx,
1328            None,
1329            true,
1330            table_suffix_template.as_ref(),
1331        );
1332
1333        assert!(result.is_ok());
1334        let rows_by_context = result.unwrap();
1335
1336        // Should have 2 different ContextOpt groups (1h TTL and 24h TTL)
1337        assert_eq!(rows_by_context.len(), 2);
1338
1339        // Count rows per group
1340        let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1341        assert_eq!(total_rows, 3);
1342
1343        // Verify that rows are correctly grouped by TTL
1344        let mut ttl_1h_count = 0;
1345        let mut ttl_24h_count = 0;
1346        for rows in rows_by_context.values() {
1347            // ContextOpt doesn't expose ttl directly, but we can count by group size
1348            if rows.len() == 2 {
1349                ttl_1h_count = rows.len();
1350            } else if rows.len() == 1 {
1351                ttl_24h_count = rows.len();
1352            }
1353        }
1354        assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
1355        assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
1356    }
1357}