pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40    ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
41    Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53/// Row with potentially designated table suffix.
54pub type RowWithTableSuffix = (Row, Option<String>);
55
56/// fields not in the columns will be discarded
57/// to prevent automatic column creation in GreptimeDB
58#[derive(Debug, Clone)]
59pub struct GreptimeTransformer {
60    transforms: Transforms,
61    schema: Vec<ColumnSchema>,
62}
63
64/// Parameters that can be used to configure the greptime pipelines.
65#[derive(Debug, Default)]
66pub struct GreptimePipelineParams {
67    /// The original options for configuring the greptime pipelines.
68    /// This should not be used directly, instead, use the parsed shortcut option values.
69    options: HashMap<String, String>,
70
71    /// Whether to skip error when processing the pipeline.
72    pub skip_error: OnceCell<bool>,
73    /// Max nested levels when flattening JSON object. Defaults to
74    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
75    pub max_nested_levels: OnceCell<usize>,
76}
77
78impl GreptimePipelineParams {
79    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
80    /// The params is in the format of `key1=value1&key2=value2`,for example:
81    /// x-greptime-pipeline-params: max_nested_levels=5
82    pub fn from_params(params: Option<&str>) -> Self {
83        let options = Self::parse_header_str_to_map(params);
84
85        Self {
86            options,
87            skip_error: OnceCell::new(),
88            max_nested_levels: OnceCell::new(),
89        }
90    }
91
92    pub fn from_map(options: HashMap<String, String>) -> Self {
93        Self {
94            options,
95            skip_error: OnceCell::new(),
96            max_nested_levels: OnceCell::new(),
97        }
98    }
99
100    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
101        if let Some(params) = params {
102            if params.is_empty() {
103                HashMap::new()
104            } else {
105                params
106                    .split('&')
107                    .filter_map(|s| s.split_once('='))
108                    .map(|(k, v)| (k.to_string(), v.to_string()))
109                    .collect::<HashMap<String, String>>()
110            }
111        } else {
112            HashMap::new()
113        }
114    }
115
116    /// Whether to skip error when processing the pipeline.
117    pub fn skip_error(&self) -> bool {
118        *self
119            .skip_error
120            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
121    }
122
123    /// Max nested levels for JSON flattening. If not provided or invalid,
124    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
125    pub fn max_nested_levels(&self) -> usize {
126        *self.max_nested_levels.get_or_init(|| {
127            self.options
128                .get("max_nested_levels")
129                .and_then(|s| s.parse::<usize>().ok())
130                .filter(|v| *v > 0)
131                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
132        })
133    }
134}
135
136impl GreptimeTransformer {
137    /// Add a default timestamp column to the transforms
138    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
139        let type_ = ColumnDataType::TimestampNanosecond;
140        let default = None;
141
142        let transform = Transform {
143            fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
144            type_,
145            default,
146            index: Some(Index::Time),
147            on_failure: Some(crate::etl::transform::OnFailure::Default),
148            tag: false,
149        };
150        transforms.push(transform);
151    }
152
153    /// Generate the schema for the GreptimeTransformer
154    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
155        let mut schema = vec![];
156        for transform in transforms.iter() {
157            schema.extend(coerce_columns(transform)?);
158        }
159        Ok(schema)
160    }
161}
162
163impl GreptimeTransformer {
164    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
165        // empty check is done in the caller
166        let mut column_names_set = HashSet::new();
167        let mut timestamp_columns = vec![];
168
169        for transform in transforms.iter() {
170            let target_fields_set = transform
171                .fields
172                .iter()
173                .map(|f| f.target_or_input_field())
174                .collect::<HashSet<_>>();
175
176            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
177            if !intersections.is_empty() {
178                let duplicates = intersections.iter().join(",");
179                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
180            }
181
182            column_names_set.extend(target_fields_set);
183
184            if let Some(idx) = transform.index
185                && idx == Index::Time
186            {
187                match transform.fields.len() {
188                    //Safety unwrap is fine here because we have checked the length of real_fields
189                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
190                    _ => {
191                        return TransformMultipleTimestampIndexSnafu {
192                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
193                        }
194                        .fail();
195                    }
196                }
197            }
198        }
199
200        let schema = match timestamp_columns.len() {
201            0 if doc_version == &PipelineDocVersion::V1 => {
202                // compatible with v1, add a default timestamp column
203                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
204                GreptimeTransformer::init_schemas(&transforms)?
205            }
206            1 => GreptimeTransformer::init_schemas(&transforms)?,
207            count => {
208                let columns = timestamp_columns.iter().join(", ");
209                return TransformTimestampIndexCountSnafu { count, columns }.fail();
210            }
211        };
212        Ok(GreptimeTransformer { transforms, schema })
213    }
214
215    pub fn transform_mut(
216        &self,
217        pipeline_map: &mut VrlValue,
218        is_v1: bool,
219    ) -> Result<Vec<GreptimeValue>> {
220        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
221        let mut output_index = 0;
222        for transform in self.transforms.iter() {
223            for field in transform.fields.iter() {
224                let column_name = field.input_field();
225
226                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
227                // let keep us `get` here to be compatible with v1
228                match pipeline_map.get(column_name) {
229                    Some(v) => {
230                        let value_data = coerce_value(v, transform)?;
231                        // every transform fields has only one output field
232                        values[output_index] = GreptimeValue { value_data };
233                    }
234                    None => {
235                        let value_data = match transform.on_failure {
236                            Some(crate::etl::transform::OnFailure::Default) => {
237                                match transform.get_default() {
238                                    Some(default) => Some(default.clone()),
239                                    None => transform.get_default_value_when_data_is_none(),
240                                }
241                            }
242                            Some(crate::etl::transform::OnFailure::Ignore) => None,
243                            None => None,
244                        };
245                        if transform.is_timeindex() && value_data.is_none() {
246                            return TimeIndexMustBeNonNullSnafu.fail();
247                        }
248                        values[output_index] = GreptimeValue { value_data };
249                    }
250                }
251                output_index += 1;
252                if !is_v1 {
253                    // remove the column from the pipeline_map
254                    // so that the auto-transform can use the rest fields
255                    pipeline_map.remove(column_name);
256                }
257            }
258        }
259        Ok(values)
260    }
261
262    pub fn transforms(&self) -> &Transforms {
263        &self.transforms
264    }
265
266    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
267        &self.schema
268    }
269
270    pub fn transforms_mut(&mut self) -> &mut Transforms {
271        &mut self.transforms
272    }
273}
274
275/// This is used to record the current state schema information and a sequential cache of field names.
276/// As you traverse the user input JSON, this will change.
277/// It will record a superset of all user input schemas.
278#[derive(Debug, Default)]
279pub struct SchemaInfo {
280    /// schema info
281    pub schema: Vec<ColumnSchema>,
282    /// index of the column name
283    pub index: HashMap<String, usize>,
284}
285
286impl SchemaInfo {
287    pub fn with_capacity(capacity: usize) -> Self {
288        Self {
289            schema: Vec::with_capacity(capacity),
290            index: HashMap::with_capacity(capacity),
291        }
292    }
293
294    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
295        let mut index = HashMap::new();
296        for (i, schema) in schema_list.iter().enumerate() {
297            index.insert(schema.column_name.clone(), i);
298        }
299        Self {
300            schema: schema_list,
301            index,
302        }
303    }
304}
305
306fn resolve_schema(
307    index: Option<usize>,
308    value_data: ValueData,
309    column_schema: ColumnSchema,
310    row: &mut Vec<GreptimeValue>,
311    schema_info: &mut SchemaInfo,
312) -> Result<()> {
313    if let Some(index) = index {
314        let api_value = GreptimeValue {
315            value_data: Some(value_data),
316        };
317        // Safety unwrap is fine here because api_value is always valid
318        let value_column_data_type = proto_value_type(&api_value).unwrap();
319        // Safety unwrap is fine here because index is always valid
320        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
321        if value_column_data_type != schema_column_data_type {
322            IdentifyPipelineColumnTypeMismatchSnafu {
323                column: column_schema.column_name,
324                expected: schema_column_data_type.as_str_name(),
325                actual: value_column_data_type.as_str_name(),
326            }
327            .fail()
328        } else {
329            row[index] = api_value;
330            Ok(())
331        }
332    } else {
333        let key = column_schema.column_name.clone();
334        schema_info.schema.push(column_schema);
335        schema_info.index.insert(key, schema_info.schema.len() - 1);
336        let api_value = GreptimeValue {
337            value_data: Some(value_data),
338        };
339        row.push(api_value);
340        Ok(())
341    }
342}
343
344fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
345    match p_ctx.channel {
346        Channel::Prometheus => {
347            let ts = values
348                .as_object()
349                .and_then(|m| m.get(greptime_timestamp()))
350                .and_then(|ts| ts.try_into_i64().ok())
351                .unwrap_or_default();
352            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
353        }
354        _ => {
355            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
356            match custom_ts {
357                Some(ts) => {
358                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
359                    Some(ts.get_timestamp_value(ts_field)).transpose()
360                }
361                None => Ok(Some(ValueData::TimestampNanosecondValue(
362                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
363                ))),
364            }
365        }
366    }
367}
368
369/// Converts VRL values to Greptime rows grouped by their ContextOpt.
370/// # Returns
371/// A HashMap where keys are `ContextOpt` and values are vectors of (row, table_suffix) pairs.
372/// Single object input produces one ContextOpt group with one row.
373/// Array input groups rows by their per-element ContextOpt values.
374///
375/// # Errors
376/// - `ArrayElementMustBeObject` if an array element is not an object
377pub(crate) fn values_to_rows(
378    schema_info: &mut SchemaInfo,
379    mut values: VrlValue,
380    pipeline_ctx: &PipelineContext<'_>,
381    row: Option<Vec<GreptimeValue>>,
382    need_calc_ts: bool,
383    tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
384) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
385    let skip_error = pipeline_ctx.pipeline_param.skip_error();
386    let VrlValue::Array(arr) = values else {
387        // Single object: extract ContextOpt and table_suffix
388        let mut result = std::collections::HashMap::new();
389
390        let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
391            Ok(r) => r,
392            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
393        };
394
395        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
396        let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
397            Ok(r) => r,
398            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
399        };
400        result.insert(opt, vec![(row, table_suffix)]);
401        return Ok(result);
402    };
403
404    let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
405        std::collections::HashMap::new();
406    for (index, mut value) in arr.into_iter().enumerate() {
407        if !value.is_object() {
408            unwrap_or_continue_if_err!(
409                ArrayElementMustBeObjectSnafu {
410                    index,
411                    actual_type: value.kind_str().to_string(),
412                }
413                .fail(),
414                skip_error
415            );
416        }
417
418        // Extract ContextOpt and table_suffix for this element
419        let mut opt = unwrap_or_continue_if_err!(
420            ContextOpt::from_pipeline_map_to_opt(&mut value),
421            skip_error
422        );
423        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
424        let transformed_row = unwrap_or_continue_if_err!(
425            values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
426            skip_error
427        );
428        rows_by_context
429            .entry(opt)
430            .or_default()
431            .push((transformed_row, table_suffix));
432    }
433    Ok(rows_by_context)
434}
435
436/// `need_calc_ts` happens in two cases:
437/// 1. full greptime_identity
438/// 2. auto-transform without transformer
439///
440/// if transform is present in custom pipeline in v2 mode
441/// we dont need to calc ts again, nor do we need to check ts column name
442pub(crate) fn values_to_row(
443    schema_info: &mut SchemaInfo,
444    values: VrlValue,
445    pipeline_ctx: &PipelineContext<'_>,
446    row: Option<Vec<GreptimeValue>>,
447    need_calc_ts: bool,
448) -> Result<Row> {
449    let mut row: Vec<GreptimeValue> =
450        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
451    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
452
453    if need_calc_ts {
454        // calculate timestamp value based on the channel
455        let ts = calc_ts(pipeline_ctx, &values)?;
456        row.push(GreptimeValue { value_data: ts });
457    }
458
459    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
460
461    // skip ts column
462    let ts_column_name = custom_ts
463        .as_ref()
464        .map_or(greptime_timestamp(), |ts| ts.get_column_name());
465
466    let values = values.into_object().context(ValueMustBeMapSnafu)?;
467
468    for (column_name, value) in values {
469        if need_calc_ts && column_name.as_str() == ts_column_name {
470            continue;
471        }
472
473        resolve_value(
474            value,
475            column_name.into(),
476            &mut row,
477            schema_info,
478            pipeline_ctx,
479        )?;
480    }
481    Ok(Row { values: row })
482}
483
484fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
485    if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
486        SemanticType::Tag as i32
487    } else {
488        SemanticType::Field as i32
489    }
490}
491
492fn resolve_value(
493    value: VrlValue,
494    column_name: String,
495    row: &mut Vec<GreptimeValue>,
496    schema_info: &mut SchemaInfo,
497    p_ctx: &PipelineContext,
498) -> Result<()> {
499    let index = schema_info.index.get(&column_name).copied();
500    let mut resolve_simple_type =
501        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
502            let semantic_type = decide_semantic(p_ctx, &column_name);
503            resolve_schema(
504                index,
505                value_data,
506                ColumnSchema {
507                    column_name,
508                    datatype: data_type as i32,
509                    semantic_type,
510                    datatype_extension: None,
511                    options: None,
512                },
513                row,
514                schema_info,
515            )
516        };
517
518    match value {
519        VrlValue::Null => {}
520
521        VrlValue::Integer(v) => {
522            // safe unwrap after type matched
523            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
524        }
525
526        VrlValue::Float(v) => {
527            // safe unwrap after type matched
528            resolve_simple_type(
529                ValueData::F64Value(v.into()),
530                column_name,
531                ColumnDataType::Float64,
532            )?;
533        }
534
535        VrlValue::Boolean(v) => {
536            resolve_simple_type(
537                ValueData::BoolValue(v),
538                column_name,
539                ColumnDataType::Boolean,
540            )?;
541        }
542
543        VrlValue::Bytes(v) => {
544            resolve_simple_type(
545                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
546                column_name,
547                ColumnDataType::String,
548            )?;
549        }
550
551        VrlValue::Regex(v) => {
552            warn!(
553                "Persisting regex value in the table, this should not happen, column_name: {}",
554                column_name
555            );
556            resolve_simple_type(
557                ValueData::StringValue(v.to_string()),
558                column_name,
559                ColumnDataType::String,
560            )?;
561        }
562
563        VrlValue::Timestamp(ts) => {
564            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
565                input: ts.to_rfc3339(),
566            })?;
567            resolve_simple_type(
568                ValueData::TimestampNanosecondValue(ns),
569                column_name,
570                ColumnDataType::TimestampNanosecond,
571            )?;
572        }
573
574        VrlValue::Array(_) | VrlValue::Object(_) => {
575            let data = vrl_value_to_jsonb_value(&value);
576            resolve_schema(
577                index,
578                ValueData::BinaryValue(data.to_vec()),
579                ColumnSchema {
580                    column_name,
581                    datatype: ColumnDataType::Binary as i32,
582                    semantic_type: SemanticType::Field as i32,
583                    datatype_extension: Some(ColumnDataTypeExtension {
584                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
585                    }),
586                    options: None,
587                },
588                row,
589                schema_info,
590            )?;
591        }
592    }
593    Ok(())
594}
595
596fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
597    match value {
598        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
599        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
600        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
601        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
602        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
603        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
604        VrlValue::Object(btree_map) => jsonb::Value::Object(
605            btree_map
606                .iter()
607                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
608                .collect(),
609        ),
610        VrlValue::Array(values) => jsonb::Value::Array(
611            values
612                .iter()
613                .map(|value| vrl_value_to_jsonb_value(value))
614                .collect(),
615        ),
616        VrlValue::Null => jsonb::Value::Null,
617    }
618}
619
620fn identity_pipeline_inner(
621    pipeline_maps: Vec<VrlValue>,
622    pipeline_ctx: &PipelineContext<'_>,
623) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
624    let skip_error = pipeline_ctx.pipeline_param.skip_error();
625    let mut schema_info = SchemaInfo::default();
626    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
627
628    // set time index column schema first
629    schema_info.schema.push(ColumnSchema {
630        column_name: custom_ts
631            .map(|ts| ts.get_column_name().to_string())
632            .unwrap_or_else(|| greptime_timestamp().to_string()),
633        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
634            if pipeline_ctx.channel == Channel::Prometheus {
635                ColumnDataType::TimestampMillisecond
636            } else {
637                ColumnDataType::TimestampNanosecond
638            }
639        }) as i32,
640        semantic_type: SemanticType::Timestamp as i32,
641        datatype_extension: None,
642        options: None,
643    });
644
645    let mut opt_map = HashMap::new();
646    let len = pipeline_maps.len();
647
648    for mut pipeline_map in pipeline_maps {
649        let opt = unwrap_or_continue_if_err!(
650            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
651            skip_error
652        );
653        let row = unwrap_or_continue_if_err!(
654            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
655            skip_error
656        );
657
658        opt_map
659            .entry(opt)
660            .or_insert_with(|| Vec::with_capacity(len))
661            .push(row);
662    }
663
664    let column_count = schema_info.schema.len();
665    for (_, row) in opt_map.iter_mut() {
666        for row in row.iter_mut() {
667            let diff = column_count - row.values.len();
668            for _ in 0..diff {
669                row.values.push(GreptimeValue { value_data: None });
670            }
671        }
672    }
673
674    Ok((schema_info, opt_map))
675}
676
677/// Identity pipeline for Greptime
678/// This pipeline will convert the input JSON array to Greptime Rows
679/// params table is used to set the semantic type of the row key column to Tag
680/// 1. The pipeline will add a default timestamp column to the schema
681/// 2. The pipeline not resolve NULL value
682/// 3. The pipeline assumes that the json format is fixed
683/// 4. The pipeline will return an error if the same column datatype is mismatched
684/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
685pub fn identity_pipeline(
686    array: Vec<VrlValue>,
687    table: Option<Arc<table::Table>>,
688    pipeline_ctx: &PipelineContext<'_>,
689) -> Result<HashMap<ContextOpt, Rows>> {
690    let skip_error = pipeline_ctx.pipeline_param.skip_error();
691    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
692    // Always flatten JSON objects and stringify arrays
693    let mut input = Vec::with_capacity(array.len());
694    for item in array.into_iter() {
695        let result =
696            unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
697        input.push(result);
698    }
699
700    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
701        if let Some(table) = table {
702            let table_info = table.table_info();
703            for tag_name in table_info.meta.row_key_column_names() {
704                if let Some(index) = schema.index.get(tag_name) {
705                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
706                }
707            }
708        }
709
710        opt_map
711            .into_iter()
712            .map(|(opt, rows)| {
713                (
714                    opt,
715                    Rows {
716                        schema: schema.schema.clone(),
717                        rows,
718                    },
719                )
720            })
721            .collect::<HashMap<ContextOpt, Rows>>()
722    })
723}
724
725/// Consumes the JSON object and consumes it into a single-level object.
726///
727/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
728/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
729/// string and stored at the current flattened key.
730pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
731    let mut flattened = BTreeMap::new();
732    let object = object.into_object().context(ValueMustBeMapSnafu)?;
733
734    if !object.is_empty() {
735        // it will use recursion to flatten the object.
736        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
737    }
738
739    Ok(VrlValue::Object(flattened))
740}
741
742fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
743    match value {
744        VrlValue::Null => serde_json_crate::Value::Null,
745        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
746        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
747        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
748            .map(serde_json_crate::Value::Number)
749            .unwrap_or(serde_json_crate::Value::Null),
750        VrlValue::Bytes(bytes) => {
751            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
752        }
753        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
754        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
755        VrlValue::Array(arr) => {
756            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
757        }
758        VrlValue::Object(map) => serde_json_crate::Value::Object(
759            map.iter()
760                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
761                .collect(),
762        ),
763    }
764}
765
766fn do_flatten_object(
767    dest: &mut BTreeMap<KeyString, VrlValue>,
768    base: Option<&str>,
769    object: BTreeMap<KeyString, VrlValue>,
770    current_level: usize,
771    max_nested_levels: usize,
772) {
773    for (key, value) in object {
774        let new_key = base.map_or_else(
775            || key.clone(),
776            |base_key| format!("{base_key}.{key}").into(),
777        );
778
779        match value {
780            VrlValue::Object(object) => {
781                if current_level >= max_nested_levels {
782                    // Reached the maximum level; stringify the remaining object.
783                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
784                        &VrlValue::Object(object),
785                    ))
786                    .unwrap_or_else(|_| String::from("{}"));
787                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
788                } else {
789                    do_flatten_object(
790                        dest,
791                        Some(&new_key),
792                        object,
793                        current_level + 1,
794                        max_nested_levels,
795                    );
796                }
797            }
798            // Arrays are stringified to ensure no JSON column types in the result.
799            VrlValue::Array(_) => {
800                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
801                    .unwrap_or_else(|_| String::from("[]"));
802                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
803            }
804            // Other leaf types are inserted as-is.
805            _ => {
806                dest.insert(new_key, value);
807            }
808        }
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use api::v1::SemanticType;
815
816    use super::*;
817    use crate::{PipelineDefinition, identity_pipeline};
818
819    #[test]
820    fn test_identify_pipeline() {
821        let params = GreptimePipelineParams::default();
822        let pipeline_ctx = PipelineContext::new(
823            &PipelineDefinition::GreptimeIdentityPipeline(None),
824            &params,
825            Channel::Unknown,
826        );
827        {
828            let array = [
829                serde_json::json!({
830                    "woshinull": null,
831                    "name": "Alice",
832                    "age": 20,
833                    "is_student": true,
834                    "score": 99.5,
835                    "hobbies": "reading",
836                    "address": "Beijing",
837                }),
838                serde_json::json!({
839                    "name": "Bob",
840                    "age": 21,
841                    "is_student": false,
842                    "score": "88.5",
843                    "hobbies": "swimming",
844                    "address": "Shanghai",
845                    "gaga": "gaga"
846                }),
847            ];
848            let array = array.iter().map(|v| v.into()).collect();
849            let rows = identity_pipeline(array, None, &pipeline_ctx);
850            assert!(rows.is_err());
851            assert_eq!(
852                rows.err().unwrap().to_string(),
853                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
854            );
855        }
856        {
857            let array = [
858                serde_json::json!({
859                    "woshinull": null,
860                    "name": "Alice",
861                    "age": 20,
862                    "is_student": true,
863                    "score": 99.5,
864                    "hobbies": "reading",
865                    "address": "Beijing",
866                }),
867                serde_json::json!({
868                    "name": "Bob",
869                    "age": 21,
870                    "is_student": false,
871                    "score": 88,
872                    "hobbies": "swimming",
873                    "address": "Shanghai",
874                    "gaga": "gaga"
875                }),
876            ];
877            let array = array.iter().map(|v| v.into()).collect();
878            let rows = identity_pipeline(array, None, &pipeline_ctx);
879            assert!(rows.is_err());
880            assert_eq!(
881                rows.err().unwrap().to_string(),
882                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
883            );
884        }
885        {
886            let array = [
887                serde_json::json!({
888                    "woshinull": null,
889                    "name": "Alice",
890                    "age": 20,
891                    "is_student": true,
892                    "score": 99.5,
893                    "hobbies": "reading",
894                    "address": "Beijing",
895                }),
896                serde_json::json!({
897                    "name": "Bob",
898                    "age": 21,
899                    "is_student": false,
900                    "score": 88.5,
901                    "hobbies": "swimming",
902                    "address": "Shanghai",
903                    "gaga": "gaga"
904                }),
905            ];
906            let array = array.iter().map(|v| v.into()).collect();
907            let rows = identity_pipeline(array, None, &pipeline_ctx);
908            assert!(rows.is_ok());
909            let mut rows = rows.unwrap();
910            assert!(rows.len() == 1);
911            let rows = rows.remove(&ContextOpt::default()).unwrap();
912            assert_eq!(rows.schema.len(), 8);
913            assert_eq!(rows.rows.len(), 2);
914            assert_eq!(8, rows.rows[0].values.len());
915            assert_eq!(8, rows.rows[1].values.len());
916        }
917        {
918            let array = [
919                serde_json::json!({
920                    "woshinull": null,
921                    "name": "Alice",
922                    "age": 20,
923                    "is_student": true,
924                    "score": 99.5,
925                    "hobbies": "reading",
926                    "address": "Beijing",
927                }),
928                serde_json::json!({
929                    "name": "Bob",
930                    "age": 21,
931                    "is_student": false,
932                    "score": 88.5,
933                    "hobbies": "swimming",
934                    "address": "Shanghai",
935                    "gaga": "gaga"
936                }),
937            ];
938            let tag_column_names = ["name".to_string(), "address".to_string()];
939
940            let rows =
941                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
942                    .map(|(mut schema, mut rows)| {
943                        for name in tag_column_names {
944                            if let Some(index) = schema.index.get(&name) {
945                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
946                            }
947                        }
948
949                        assert!(rows.len() == 1);
950                        let rows = rows.remove(&ContextOpt::default()).unwrap();
951
952                        Rows {
953                            schema: schema.schema,
954                            rows,
955                        }
956                    });
957
958            assert!(rows.is_ok());
959            let rows = rows.unwrap();
960            assert_eq!(rows.schema.len(), 8);
961            assert_eq!(rows.rows.len(), 2);
962            assert_eq!(8, rows.rows[0].values.len());
963            assert_eq!(8, rows.rows[1].values.len());
964            assert_eq!(
965                rows.schema
966                    .iter()
967                    .find(|x| x.column_name == "name")
968                    .unwrap()
969                    .semantic_type,
970                SemanticType::Tag as i32
971            );
972            assert_eq!(
973                rows.schema
974                    .iter()
975                    .find(|x| x.column_name == "address")
976                    .unwrap()
977                    .semantic_type,
978                SemanticType::Tag as i32
979            );
980            assert_eq!(
981                rows.schema
982                    .iter()
983                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
984                    .count(),
985                2
986            );
987        }
988    }
989
990    #[test]
991    fn test_flatten() {
992        let test_cases = vec![
993            // Basic case.
994            (
995                serde_json::json!(
996                    {
997                        "a": {
998                            "b": {
999                                "c": [1, 2, 3]
1000                            }
1001                        },
1002                        "d": [
1003                            "foo",
1004                            "bar"
1005                        ],
1006                        "e": {
1007                            "f": [7, 8, 9],
1008                            "g": {
1009                                "h": 123,
1010                                "i": "hello",
1011                                "j": {
1012                                    "k": true
1013                                }
1014                            }
1015                        }
1016                    }
1017                ),
1018                10,
1019                Some(serde_json::json!(
1020                    {
1021                        "a.b.c": "[1,2,3]",
1022                        "d": "[\"foo\",\"bar\"]",
1023                        "e.f": "[7,8,9]",
1024                        "e.g.h": 123,
1025                        "e.g.i": "hello",
1026                        "e.g.j.k": true
1027                    }
1028                )),
1029            ),
1030            // Test the case where the object has more than 3 nested levels.
1031            (
1032                serde_json::json!(
1033                    {
1034                        "a": {
1035                            "b": {
1036                                "c": {
1037                                    "d": [1, 2, 3]
1038                                }
1039                            }
1040                        },
1041                        "e": [
1042                            "foo",
1043                            "bar"
1044                        ]
1045                    }
1046                ),
1047                3,
1048                Some(serde_json::json!(
1049                    {
1050                        "a.b.c": "{\"d\":[1,2,3]}",
1051                        "e": "[\"foo\",\"bar\"]"
1052                    }
1053                )),
1054            ),
1055        ];
1056
1057        for (input, max_depth, expected) in test_cases {
1058            let input = input.into();
1059            let expected = expected.map(|e| e.into());
1060
1061            let flattened_object = flatten_object(input, max_depth).ok();
1062            assert_eq!(flattened_object, expected);
1063        }
1064    }
1065
1066    use ahash::HashMap as AHashMap;
1067    #[test]
1068    fn test_values_to_rows_skip_error_handling() {
1069        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1070
1071        // Case 1: skip_error=true, mixed valid/invalid elements
1072        {
1073            let schema_info = &mut SchemaInfo::default();
1074            let input_array = vec![
1075                // Valid object
1076                serde_json::json!({"name": "Alice", "age": 25}).into(),
1077                // Invalid element (string)
1078                VrlValue::Bytes("invalid_string".into()),
1079                // Valid object
1080                serde_json::json!({"name": "Bob", "age": 30}).into(),
1081                // Invalid element (number)
1082                VrlValue::Integer(42),
1083                // Valid object
1084                serde_json::json!({"name": "Charlie", "age": 35}).into(),
1085            ];
1086
1087            let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1088                "skip_error".to_string(),
1089                "true".to_string(),
1090            )]));
1091
1092            let pipeline_ctx = PipelineContext::new(
1093                &PipelineDefinition::GreptimeIdentityPipeline(None),
1094                &params,
1095                Channel::Unknown,
1096            );
1097
1098            let result = values_to_rows(
1099                schema_info,
1100                VrlValue::Array(input_array),
1101                &pipeline_ctx,
1102                None,
1103                true,
1104                table_suffix_template.as_ref(),
1105            );
1106
1107            // Should succeed and only process valid objects
1108            assert!(result.is_ok());
1109            let rows_by_context = result.unwrap();
1110            // Count total rows across all ContextOpt groups
1111            let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1112            assert_eq!(total_rows, 3); // Only 3 valid objects
1113        }
1114
1115        // Case 2: skip_error=false, invalid elements present
1116        {
1117            let schema_info = &mut SchemaInfo::default();
1118            let input_array = vec![
1119                serde_json::json!({"name": "Alice", "age": 25}).into(),
1120                VrlValue::Bytes("invalid_string".into()), // This should cause error
1121            ];
1122
1123            let params = GreptimePipelineParams::default(); // skip_error = false
1124
1125            let pipeline_ctx = PipelineContext::new(
1126                &PipelineDefinition::GreptimeIdentityPipeline(None),
1127                &params,
1128                Channel::Unknown,
1129            );
1130
1131            let result = values_to_rows(
1132                schema_info,
1133                VrlValue::Array(input_array),
1134                &pipeline_ctx,
1135                None,
1136                true,
1137                table_suffix_template.as_ref(),
1138            );
1139
1140            // Should fail with ArrayElementMustBeObject error
1141            assert!(result.is_err());
1142            let error_msg = result.unwrap_err().to_string();
1143            assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1144        }
1145    }
1146
1147    /// Test that values_to_rows correctly groups rows by per-element ContextOpt
1148    #[test]
1149    fn test_values_to_rows_per_element_context_opt() {
1150        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1151        let schema_info = &mut SchemaInfo::default();
1152
1153        // Create array with elements having different TTL values (ContextOpt)
1154        let input_array = vec![
1155            serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1156            serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1157            serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1158        ];
1159
1160        let params = GreptimePipelineParams::default();
1161        let pipeline_ctx = PipelineContext::new(
1162            &PipelineDefinition::GreptimeIdentityPipeline(None),
1163            &params,
1164            Channel::Unknown,
1165        );
1166
1167        let result = values_to_rows(
1168            schema_info,
1169            VrlValue::Array(input_array),
1170            &pipeline_ctx,
1171            None,
1172            true,
1173            table_suffix_template.as_ref(),
1174        );
1175
1176        assert!(result.is_ok());
1177        let rows_by_context = result.unwrap();
1178
1179        // Should have 2 different ContextOpt groups (1h TTL and 24h TTL)
1180        assert_eq!(rows_by_context.len(), 2);
1181
1182        // Count rows per group
1183        let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1184        assert_eq!(total_rows, 3);
1185
1186        // Verify that rows are correctly grouped by TTL
1187        let mut ttl_1h_count = 0;
1188        let mut ttl_24h_count = 0;
1189        for rows in rows_by_context.values() {
1190            // ContextOpt doesn't expose ttl directly, but we can count by group size
1191            if rows.len() == 2 {
1192                ttl_1h_count = rows.len();
1193            } else if rows.len() == 1 {
1194                ttl_24h_count = rows.len();
1195            }
1196        }
1197        assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
1198        assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
1199    }
1200}