Skip to main content

pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::{ColumnDataTypeWrapper, encode_json_value};
23use api::v1::column_def::{collect_column_options, options_from_column_schema};
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, SemanticType};
26use arrow_schema::extension::ExtensionType;
27use coerce::{coerce_columns, coerce_value};
28use common_query::prelude::{greptime_timestamp, greptime_value};
29use common_telemetry::warn;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::extension::json::JsonExtensionType;
32use datatypes::value::Value;
33use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
34use itertools::Itertools;
35use jsonb::Number;
36use once_cell::sync::OnceCell;
37use serde_json as serde_json_crate;
38use session::context::Channel;
39use snafu::OptionExt;
40use table::Table;
41use vrl::prelude::{Bytes, VrlValueConvert};
42use vrl::value::value::StdError;
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46    ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
47    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
48    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
49    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
50};
51use crate::etl::PipelineDocVersion;
52use crate::etl::ctx_req::ContextOpt;
53use crate::etl::field::{Field, Fields};
54use crate::etl::transform::index::Index;
55use crate::etl::transform::{Transform, Transforms};
56use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
57
58const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
59
60/// Row with potentially designated table suffix.
61pub type RowWithTableSuffix = (Row, Option<String>);
62
63/// fields not in the columns will be discarded
64/// to prevent automatic column creation in GreptimeDB
65#[derive(Debug, Clone)]
66pub struct GreptimeTransformer {
67    transforms: Transforms,
68    schema: Vec<ColumnSchema>,
69}
70
71/// Parameters that can be used to configure the greptime pipelines.
72#[derive(Debug, Default)]
73pub struct GreptimePipelineParams {
74    /// The original options for configuring the greptime pipelines.
75    /// This should not be used directly, instead, use the parsed shortcut option values.
76    options: HashMap<String, String>,
77
78    /// Whether to skip error when processing the pipeline.
79    pub skip_error: OnceCell<bool>,
80    /// Max nested levels when flattening JSON object. Defaults to
81    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
82    pub max_nested_levels: OnceCell<usize>,
83}
84
85impl GreptimePipelineParams {
86    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
87    /// The params is in the format of `key1=value1&key2=value2`,for example:
88    /// x-greptime-pipeline-params: max_nested_levels=5
89    pub fn from_params(params: Option<&str>) -> Self {
90        let options = Self::parse_header_str_to_map(params);
91
92        Self {
93            options,
94            skip_error: OnceCell::new(),
95            max_nested_levels: OnceCell::new(),
96        }
97    }
98
99    pub fn from_map(options: HashMap<String, String>) -> Self {
100        Self {
101            options,
102            skip_error: OnceCell::new(),
103            max_nested_levels: OnceCell::new(),
104        }
105    }
106
107    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
108        if let Some(params) = params {
109            if params.is_empty() {
110                HashMap::new()
111            } else {
112                params
113                    .split('&')
114                    .filter_map(|s| s.split_once('='))
115                    .map(|(k, v)| (k.to_string(), v.to_string()))
116                    .collect::<HashMap<String, String>>()
117            }
118        } else {
119            HashMap::new()
120        }
121    }
122
123    /// Whether to skip error when processing the pipeline.
124    pub fn skip_error(&self) -> bool {
125        *self
126            .skip_error
127            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
128    }
129
130    /// Max nested levels for JSON flattening. If not provided or invalid,
131    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
132    pub fn max_nested_levels(&self) -> usize {
133        *self.max_nested_levels.get_or_init(|| {
134            self.options
135                .get("max_nested_levels")
136                .and_then(|s| s.parse::<usize>().ok())
137                .filter(|v| *v > 0)
138                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
139        })
140    }
141}
142
143impl GreptimeTransformer {
144    /// Add a default timestamp column to the transforms
145    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
146        let type_ = ColumnDataType::TimestampNanosecond;
147        let default = None;
148
149        let transform = Transform {
150            fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
151            type_,
152            default,
153            index: Some(Index::Time),
154            index_options: None,
155            on_failure: Some(crate::etl::transform::OnFailure::Default),
156            tag: false,
157        };
158        transforms.push(transform);
159    }
160
161    /// Generate the schema for the GreptimeTransformer
162    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
163        let mut schema = vec![];
164        for transform in transforms.iter() {
165            schema.extend(coerce_columns(transform)?);
166        }
167        Ok(schema)
168    }
169}
170
171impl GreptimeTransformer {
172    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
173        // empty check is done in the caller
174        let mut column_names_set = HashSet::new();
175        let mut timestamp_columns = vec![];
176
177        for transform in transforms.iter() {
178            let target_fields_set = transform
179                .fields
180                .iter()
181                .map(|f| f.target_or_input_field())
182                .collect::<HashSet<_>>();
183
184            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
185            if !intersections.is_empty() {
186                let duplicates = intersections.iter().join(",");
187                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
188            }
189
190            column_names_set.extend(target_fields_set);
191
192            if let Some(idx) = transform.index
193                && idx == Index::Time
194            {
195                match transform.fields.len() {
196                    //Safety unwrap is fine here because we have checked the length of real_fields
197                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
198                    _ => {
199                        return TransformMultipleTimestampIndexSnafu {
200                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
201                        }
202                        .fail();
203                    }
204                }
205            }
206        }
207
208        let schema = match timestamp_columns.len() {
209            0 if doc_version == &PipelineDocVersion::V1 => {
210                // compatible with v1, add a default timestamp column
211                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
212                GreptimeTransformer::init_schemas(&transforms)?
213            }
214            1 => GreptimeTransformer::init_schemas(&transforms)?,
215            count => {
216                let columns = timestamp_columns.iter().join(", ");
217                return TransformTimestampIndexCountSnafu { count, columns }.fail();
218            }
219        };
220        Ok(GreptimeTransformer { transforms, schema })
221    }
222
223    pub fn transform_mut(
224        &self,
225        pipeline_map: &mut VrlValue,
226        is_v1: bool,
227    ) -> Result<Vec<GreptimeValue>> {
228        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
229        let mut output_index = 0;
230        for transform in self.transforms.iter() {
231            for field in transform.fields.iter() {
232                let column_name = field.input_field();
233
234                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
235                // let keep us `get` here to be compatible with v1
236                match pipeline_map.get(column_name) {
237                    Some(v) => {
238                        let value_data = coerce_value(v, transform)?;
239                        // every transform fields has only one output field
240                        values[output_index] = GreptimeValue { value_data };
241                    }
242                    None => {
243                        let value_data = match transform.on_failure {
244                            Some(crate::etl::transform::OnFailure::Default) => {
245                                match transform.get_default() {
246                                    Some(default) => Some(default.clone()),
247                                    None => transform.get_default_value_when_data_is_none(),
248                                }
249                            }
250                            Some(crate::etl::transform::OnFailure::Ignore) => None,
251                            None => None,
252                        };
253                        if transform.is_timeindex() && value_data.is_none() {
254                            return TimeIndexMustBeNonNullSnafu.fail();
255                        }
256                        values[output_index] = GreptimeValue { value_data };
257                    }
258                }
259                output_index += 1;
260                if !is_v1 {
261                    // remove the column from the pipeline_map
262                    // so that the auto-transform can use the rest fields
263                    pipeline_map.remove(column_name);
264                }
265            }
266        }
267        Ok(values)
268    }
269
270    pub fn transforms(&self) -> &Transforms {
271        &self.transforms
272    }
273
274    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275        &self.schema
276    }
277
278    pub fn transforms_mut(&mut self) -> &mut Transforms {
279        &mut self.transforms
280    }
281}
282
283#[derive(Clone)]
284pub struct ColumnMetadata {
285    column_schema: datatypes::schema::ColumnSchema,
286    semantic_type: SemanticType,
287}
288
289impl From<ColumnSchema> for ColumnMetadata {
290    fn from(value: ColumnSchema) -> Self {
291        let datatype = value.datatype();
292        let semantic_type = value.semantic_type();
293        let ColumnSchema {
294            column_name,
295            datatype: _,
296            semantic_type: _,
297            datatype_extension,
298            options,
299        } = value;
300
301        let column_schema = datatypes::schema::ColumnSchema::new(
302            column_name,
303            ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
304            semantic_type != SemanticType::Timestamp,
305        );
306
307        let metadata = collect_column_options(options.as_ref());
308        let column_schema = column_schema.with_metadata(metadata);
309
310        Self {
311            column_schema,
312            semantic_type,
313        }
314    }
315}
316
317impl TryFrom<ColumnMetadata> for ColumnSchema {
318    type Error = api::error::Error;
319
320    fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
321        let ColumnMetadata {
322            column_schema,
323            semantic_type,
324        } = value;
325
326        let options = options_from_column_schema(&column_schema);
327
328        let (datatype, datatype_extension) =
329            ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
330
331        Ok(ColumnSchema {
332            column_name: column_schema.name,
333            datatype: datatype as _,
334            semantic_type: semantic_type as _,
335            datatype_extension,
336            options,
337        })
338    }
339}
340
341/// This is used to record the current state schema information and a sequential cache of field names.
342/// As you traverse the user input JSON, this will change.
343/// It will record a superset of all user input schemas.
344#[derive(Default)]
345pub struct SchemaInfo {
346    /// schema info
347    pub schema: Vec<ColumnMetadata>,
348    /// index of the column name
349    pub index: HashMap<String, usize>,
350    /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
351    table: Option<Arc<Table>>,
352}
353
354impl SchemaInfo {
355    pub fn with_capacity(capacity: usize) -> Self {
356        Self {
357            schema: Vec::with_capacity(capacity),
358            index: HashMap::with_capacity(capacity),
359            table: None,
360        }
361    }
362
363    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
364        let mut index = HashMap::new();
365        for (i, schema) in schema_list.iter().enumerate() {
366            index.insert(schema.column_name.clone(), i);
367        }
368        Self {
369            schema: schema_list.into_iter().map(Into::into).collect(),
370            index,
371            table: None,
372        }
373    }
374
375    pub fn set_table(&mut self, table: Option<Arc<Table>>) {
376        self.table = table;
377    }
378
379    fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
380        if let Some(table) = &self.table
381            && let Some(i) = table.schema_ref().column_index_by_name(column_name)
382        {
383            let column_schema = table.schema_ref().column_schemas()[i].clone();
384
385            let semantic_type = if column_schema.is_time_index() {
386                SemanticType::Timestamp
387            } else if table.table_info().meta.primary_key_indices.contains(&i) {
388                SemanticType::Tag
389            } else {
390                SemanticType::Field
391            };
392
393            Some(ColumnMetadata {
394                column_schema,
395                semantic_type,
396            })
397        } else {
398            None
399        }
400    }
401
402    pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
403        self.schema
404            .iter()
405            .map(|x| x.clone().try_into())
406            .collect::<api::error::Result<Vec<_>>>()
407    }
408}
409
410fn resolve_schema(
411    index: Option<usize>,
412    pipeline_context: &PipelineContext,
413    column: &str,
414    value_type: &ConcreteDataType,
415    schema_info: &mut SchemaInfo,
416) -> Result<()> {
417    if let Some(index) = index {
418        let column_type = &mut schema_info.schema[index].column_schema.data_type;
419        match (column_type, value_type) {
420            (column_type, value_type) if column_type == value_type => Ok(()),
421            (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
422                if column_type.is_json2() && value_type.is_json2() =>
423            {
424                Ok(())
425            }
426            (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
427                column,
428                expected: column_type.to_string(),
429                actual: value_type.to_string(),
430            }
431            .fail(),
432        }
433    } else {
434        let column_schema = schema_info
435            .find_column_schema_in_table(column)
436            .unwrap_or_else(|| {
437                let semantic_type = decide_semantic(pipeline_context, column);
438                let column_schema = datatypes::schema::ColumnSchema::new(
439                    column,
440                    value_type.clone(),
441                    semantic_type != SemanticType::Timestamp,
442                );
443                ColumnMetadata {
444                    column_schema,
445                    semantic_type,
446                }
447            });
448        let key = column.to_string();
449        schema_info.schema.push(column_schema);
450        schema_info.index.insert(key, schema_info.schema.len() - 1);
451        Ok(())
452    }
453}
454
455fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
456    match p_ctx.channel {
457        Channel::Prometheus => {
458            let ts = values
459                .as_object()
460                .and_then(|m| m.get(greptime_timestamp()))
461                .and_then(|ts| ts.try_into_i64().ok())
462                .unwrap_or_default();
463            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
464        }
465        _ => {
466            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
467            match custom_ts {
468                Some(ts) => {
469                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
470                    Some(ts.get_timestamp_value(ts_field)).transpose()
471                }
472                None => Ok(Some(ValueData::TimestampNanosecondValue(
473                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
474                ))),
475            }
476        }
477    }
478}
479
480/// Converts VRL values to Greptime rows grouped by their ContextOpt.
481/// # Returns
482/// A HashMap where keys are `ContextOpt` and values are vectors of (row, table_suffix) pairs.
483/// Single object input produces one ContextOpt group with one row.
484/// Array input groups rows by their per-element ContextOpt values.
485///
486/// # Errors
487/// - `ArrayElementMustBeObject` if an array element is not an object
488pub(crate) fn values_to_rows(
489    schema_info: &mut SchemaInfo,
490    mut values: VrlValue,
491    pipeline_ctx: &PipelineContext<'_>,
492    row: Option<Vec<GreptimeValue>>,
493    need_calc_ts: bool,
494    tablesuffix_template: Option<&crate::tablesuffix::TableSuffixTemplate>,
495) -> Result<std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
496    let skip_error = pipeline_ctx.pipeline_param.skip_error();
497    let VrlValue::Array(arr) = values else {
498        // Single object: extract ContextOpt and table_suffix
499        let mut result = std::collections::HashMap::new();
500
501        let mut opt = match ContextOpt::from_pipeline_map_to_opt(&mut values) {
502            Ok(r) => r,
503            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
504        };
505
506        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &values);
507        let row = match values_to_row(schema_info, values, pipeline_ctx, row, need_calc_ts) {
508            Ok(r) => r,
509            Err(e) => return if skip_error { Ok(result) } else { Err(e) },
510        };
511        result.insert(opt, vec![(row, table_suffix)]);
512        return Ok(result);
513    };
514
515    let mut rows_by_context: std::collections::HashMap<ContextOpt, Vec<RowWithTableSuffix>> =
516        std::collections::HashMap::new();
517    for (index, mut value) in arr.into_iter().enumerate() {
518        if !value.is_object() {
519            unwrap_or_continue_if_err!(
520                ArrayElementMustBeObjectSnafu {
521                    index,
522                    actual_type: value.kind_str().to_string(),
523                }
524                .fail(),
525                skip_error
526            );
527        }
528
529        // Extract ContextOpt and table_suffix for this element
530        let mut opt = unwrap_or_continue_if_err!(
531            ContextOpt::from_pipeline_map_to_opt(&mut value),
532            skip_error
533        );
534        let table_suffix = opt.resolve_table_suffix(tablesuffix_template, &value);
535        let transformed_row = unwrap_or_continue_if_err!(
536            values_to_row(schema_info, value, pipeline_ctx, row.clone(), need_calc_ts),
537            skip_error
538        );
539        rows_by_context
540            .entry(opt)
541            .or_default()
542            .push((transformed_row, table_suffix));
543    }
544    Ok(rows_by_context)
545}
546
547/// `need_calc_ts` happens in two cases:
548/// 1. full greptime_identity
549/// 2. auto-transform without transformer
550///
551/// if transform is present in custom pipeline in v2 mode
552/// we dont need to calc ts again, nor do we need to check ts column name
553pub(crate) fn values_to_row(
554    schema_info: &mut SchemaInfo,
555    values: VrlValue,
556    pipeline_ctx: &PipelineContext<'_>,
557    row: Option<Vec<GreptimeValue>>,
558    need_calc_ts: bool,
559) -> Result<Row> {
560    let mut row: Vec<GreptimeValue> =
561        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
562    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
563
564    if need_calc_ts {
565        // calculate timestamp value based on the channel
566        let ts = calc_ts(pipeline_ctx, &values)?;
567        row.push(GreptimeValue { value_data: ts });
568    }
569
570    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
571
572    // skip ts column
573    let ts_column_name = custom_ts
574        .as_ref()
575        .map_or(greptime_timestamp(), |ts| ts.get_column_name());
576
577    let values = values.into_object().context(ValueMustBeMapSnafu)?;
578
579    for (column_name, value) in values {
580        if need_calc_ts && column_name.as_str() == ts_column_name {
581            continue;
582        }
583
584        resolve_value(
585            value,
586            column_name.into(),
587            &mut row,
588            schema_info,
589            pipeline_ctx,
590        )?;
591    }
592    Ok(Row { values: row })
593}
594
595fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
596    if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
597        SemanticType::Tag
598    } else {
599        SemanticType::Field
600    }
601}
602
603fn resolve_value(
604    value: VrlValue,
605    column_name: String,
606    row: &mut Vec<GreptimeValue>,
607    schema_info: &mut SchemaInfo,
608    p_ctx: &PipelineContext,
609) -> Result<()> {
610    let index = schema_info.index.get(&column_name).copied();
611
612    let value_data = match value {
613        VrlValue::Null => return Ok(()),
614
615        VrlValue::Integer(v) => {
616            // safe unwrap after type matched
617            resolve_schema(
618                index,
619                p_ctx,
620                &column_name,
621                &ConcreteDataType::int64_datatype(),
622                schema_info,
623            )?;
624            Some(ValueData::I64Value(v))
625        }
626
627        VrlValue::Float(v) => {
628            // safe unwrap after type matched
629            resolve_schema(
630                index,
631                p_ctx,
632                &column_name,
633                &ConcreteDataType::float64_datatype(),
634                schema_info,
635            )?;
636            Some(ValueData::F64Value(v.into()))
637        }
638
639        VrlValue::Boolean(v) => {
640            resolve_schema(
641                index,
642                p_ctx,
643                &column_name,
644                &ConcreteDataType::boolean_datatype(),
645                schema_info,
646            )?;
647            Some(ValueData::BoolValue(v))
648        }
649
650        VrlValue::Bytes(v) => {
651            resolve_schema(
652                index,
653                p_ctx,
654                &column_name,
655                &ConcreteDataType::string_datatype(),
656                schema_info,
657            )?;
658            Some(ValueData::StringValue(String::from_utf8_lossy_owned(
659                v.to_vec(),
660            )))
661        }
662
663        VrlValue::Regex(v) => {
664            warn!(
665                "Persisting regex value in the table, this should not happen, column_name: {}",
666                column_name
667            );
668            resolve_schema(
669                index,
670                p_ctx,
671                &column_name,
672                &ConcreteDataType::string_datatype(),
673                schema_info,
674            )?;
675            Some(ValueData::StringValue(v.to_string()))
676        }
677
678        VrlValue::Timestamp(ts) => {
679            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
680                input: ts.to_rfc3339(),
681            })?;
682            resolve_schema(
683                index,
684                p_ctx,
685                &column_name,
686                &ConcreteDataType::timestamp_nanosecond_datatype(),
687                schema_info,
688            )?;
689            Some(ValueData::TimestampNanosecondValue(ns))
690        }
691
692        VrlValue::Array(_) | VrlValue::Object(_) => {
693            let is_json2 = schema_info
694                .find_column_schema_in_table(&column_name)
695                .is_some_and(|x| {
696                    matches!(
697                        &x.column_schema.data_type,
698                        ConcreteDataType::Json(column_type) if column_type.is_json2()
699                    )
700                });
701
702            let value = if is_json2 {
703                let json_extension_type: Option<JsonExtensionType> =
704                    if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
705                        x.column_schema.extension_type()?
706                    } else {
707                        None
708                    };
709                let settings = json_extension_type
710                    .and_then(|x| x.metadata().json_settings.clone())
711                    .unwrap_or_default();
712                let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
713                    CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
714                })?;
715                let value = settings.encode(value)?;
716
717                resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
718
719                let Value::Json(value) = value else {
720                    unreachable!()
721                };
722                ValueData::JsonValue(encode_json_value(*value))
723            } else {
724                resolve_schema(
725                    index,
726                    p_ctx,
727                    &column_name,
728                    &ConcreteDataType::binary_datatype(),
729                    schema_info,
730                )?;
731
732                let value = vrl_value_to_jsonb_value(&value);
733                ValueData::BinaryValue(value.to_vec())
734            };
735            Some(value)
736        }
737    };
738
739    let value = GreptimeValue { value_data };
740    if let Some(index) = index {
741        row[index] = value;
742    } else {
743        row.push(value);
744    }
745    Ok(())
746}
747
748fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
749    match value {
750        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
751        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
752        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
753        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
754        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
755        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
756        VrlValue::Object(btree_map) => jsonb::Value::Object(
757            btree_map
758                .iter()
759                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
760                .collect(),
761        ),
762        VrlValue::Array(values) => jsonb::Value::Array(
763            values
764                .iter()
765                .map(|value| vrl_value_to_jsonb_value(value))
766                .collect(),
767        ),
768        VrlValue::Null => jsonb::Value::Null,
769    }
770}
771
772fn identity_pipeline_inner(
773    pipeline_maps: Vec<VrlValue>,
774    pipeline_ctx: &PipelineContext<'_>,
775    max_nested_levels: usize,
776) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
777    let skip_error = pipeline_ctx.pipeline_param.skip_error();
778    let mut schema_info = SchemaInfo::default();
779    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
780
781    // set time index column schema first
782    let column_schema = datatypes::schema::ColumnSchema::new(
783        custom_ts
784            .map(|ts| ts.get_column_name().to_string())
785            .unwrap_or_else(|| greptime_timestamp().to_string()),
786        custom_ts
787            .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
788            .unwrap_or_else(|| {
789                if pipeline_ctx.channel == Channel::Prometheus {
790                    ConcreteDataType::timestamp_millisecond_datatype()
791                } else {
792                    ConcreteDataType::timestamp_nanosecond_datatype()
793                }
794            }),
795        false,
796    );
797    schema_info.schema.push(ColumnMetadata {
798        column_schema,
799        semantic_type: SemanticType::Timestamp,
800    });
801
802    let mut opt_map = HashMap::new();
803    let len = pipeline_maps.len();
804
805    for pipeline_map in pipeline_maps {
806        let mut pipeline_map =
807            unwrap_or_continue_if_err!(flatten_object(pipeline_map, max_nested_levels), skip_error);
808        let opt = unwrap_or_continue_if_err!(
809            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
810            skip_error
811        );
812        let row = unwrap_or_continue_if_err!(
813            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
814            skip_error
815        );
816
817        opt_map
818            .entry(opt)
819            .or_insert_with(|| Vec::with_capacity(len))
820            .push(row);
821    }
822
823    let column_count = schema_info.schema.len();
824    for (_, row) in opt_map.iter_mut() {
825        for row in row.iter_mut() {
826            assert!(
827                column_count >= row.values.len(),
828                "column_count: {}, row.values.len(): {}",
829                column_count,
830                row.values.len()
831            );
832            row.values
833                .resize(column_count, GreptimeValue { value_data: None });
834        }
835    }
836
837    Ok((schema_info, opt_map))
838}
839
840/// Identity pipeline for Greptime
841/// This pipeline will convert the input JSON array to Greptime Rows
842/// params table is used to set the semantic type of the row key column to Tag
843/// 1. The pipeline will add a default timestamp column to the schema
844/// 2. The pipeline not resolve NULL value
845/// 3. The pipeline assumes that the json format is fixed
846/// 4. The pipeline will return an error if the same column datatype is mismatched
847/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
848pub fn identity_pipeline(
849    array: Vec<VrlValue>,
850    table: Option<Arc<table::Table>>,
851    pipeline_ctx: &PipelineContext<'_>,
852) -> Result<HashMap<ContextOpt, Rows>> {
853    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
854
855    let (mut schema, opt_map) = identity_pipeline_inner(array, pipeline_ctx, max_nested_levels)?;
856    if let Some(table) = table {
857        let table_info = table.table_info();
858        for tag_name in table_info.meta.row_key_column_names() {
859            if let Some(index) = schema.index.get(tag_name) {
860                schema.schema[*index].semantic_type = SemanticType::Tag;
861            }
862        }
863    }
864
865    let column_schemas = schema.column_schemas()?;
866    Ok(opt_map
867        .into_iter()
868        .map(|(opt, rows)| {
869            (
870                opt,
871                Rows {
872                    schema: column_schemas.clone(),
873                    rows,
874                },
875            )
876        })
877        .collect::<HashMap<ContextOpt, Rows>>())
878}
879
880/// Consumes the JSON object and consumes it into a single-level object.
881///
882/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
883/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
884/// string and stored at the current flattened key.
885pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
886    let mut flattened = BTreeMap::new();
887    let object = object.into_object().context(ValueMustBeMapSnafu)?;
888
889    if !object.is_empty() {
890        // it will use recursion to flatten the object.
891        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
892    }
893
894    Ok(VrlValue::Object(flattened))
895}
896
897fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
898    match value {
899        VrlValue::Null => serde_json_crate::Value::Null,
900        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
901        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
902        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
903            .map(serde_json_crate::Value::Number)
904            .unwrap_or(serde_json_crate::Value::Null),
905        VrlValue::Bytes(bytes) => {
906            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
907        }
908        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
909        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
910        VrlValue::Array(arr) => {
911            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
912        }
913        VrlValue::Object(map) => serde_json_crate::Value::Object(
914            map.iter()
915                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
916                .collect(),
917        ),
918    }
919}
920
921fn do_flatten_object(
922    dest: &mut BTreeMap<KeyString, VrlValue>,
923    base: Option<&str>,
924    object: BTreeMap<KeyString, VrlValue>,
925    current_level: usize,
926    max_nested_levels: usize,
927) {
928    for (key, value) in object {
929        let new_key = base.map_or_else(
930            || key.clone(),
931            |base_key| format!("{base_key}.{key}").into(),
932        );
933
934        match value {
935            VrlValue::Object(object) => {
936                if current_level >= max_nested_levels {
937                    // Reached the maximum level; stringify the remaining object.
938                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
939                        &VrlValue::Object(object),
940                    ))
941                    .unwrap_or_else(|_| String::from("{}"));
942                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
943                } else {
944                    do_flatten_object(
945                        dest,
946                        Some(&new_key),
947                        object,
948                        current_level + 1,
949                        max_nested_levels,
950                    );
951                }
952            }
953            // Arrays are stringified to ensure no JSON column types in the result.
954            VrlValue::Array(_) => {
955                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
956                    .unwrap_or_else(|_| String::from("[]"));
957                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
958            }
959            // Other leaf types are inserted as-is.
960            _ => {
961                dest.insert(new_key, value);
962            }
963        }
964    }
965}
966
967#[cfg(test)]
968mod tests {
969    use api::v1::SemanticType;
970
971    use super::*;
972    use crate::{PipelineDefinition, identity_pipeline};
973
974    #[test]
975    fn test_identify_pipeline() {
976        let params = GreptimePipelineParams::default();
977        let pipeline_ctx = PipelineContext::new(
978            &PipelineDefinition::GreptimeIdentityPipeline(None),
979            &params,
980            Channel::Unknown,
981        );
982        {
983            let array = [
984                serde_json::json!({
985                    "woshinull": null,
986                    "name": "Alice",
987                    "age": 20,
988                    "is_student": true,
989                    "score": 99.5,
990                    "hobbies": "reading",
991                    "address": "Beijing",
992                }),
993                serde_json::json!({
994                    "name": "Bob",
995                    "age": 21,
996                    "is_student": false,
997                    "score": "88.5",
998                    "hobbies": "swimming",
999                    "address": "Shanghai",
1000                    "gaga": "gaga"
1001                }),
1002            ];
1003            let array = array.iter().map(|v| v.into()).collect();
1004            let rows = identity_pipeline(array, None, &pipeline_ctx);
1005            assert!(rows.is_err());
1006            assert_eq!(
1007                rows.err().unwrap().to_string(),
1008                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
1009            );
1010        }
1011        {
1012            let array = [
1013                serde_json::json!({
1014                    "woshinull": null,
1015                    "name": "Alice",
1016                    "age": 20,
1017                    "is_student": true,
1018                    "score": 99.5,
1019                    "hobbies": "reading",
1020                    "address": "Beijing",
1021                }),
1022                serde_json::json!({
1023                    "name": "Bob",
1024                    "age": 21,
1025                    "is_student": false,
1026                    "score": 88,
1027                    "hobbies": "swimming",
1028                    "address": "Shanghai",
1029                    "gaga": "gaga"
1030                }),
1031            ];
1032            let array = array.iter().map(|v| v.into()).collect();
1033            let rows = identity_pipeline(array, None, &pipeline_ctx);
1034            assert!(rows.is_err());
1035            assert_eq!(
1036                rows.err().unwrap().to_string(),
1037                "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
1038            );
1039        }
1040        {
1041            let array = [
1042                serde_json::json!({
1043                    "woshinull": null,
1044                    "name": "Alice",
1045                    "age": 20,
1046                    "is_student": true,
1047                    "score": 99.5,
1048                    "hobbies": "reading",
1049                    "address": "Beijing",
1050                }),
1051                serde_json::json!({
1052                    "name": "Bob",
1053                    "age": 21,
1054                    "is_student": false,
1055                    "score": 88.5,
1056                    "hobbies": "swimming",
1057                    "address": "Shanghai",
1058                    "gaga": "gaga"
1059                }),
1060            ];
1061            let array = array.iter().map(|v| v.into()).collect();
1062            let rows = identity_pipeline(array, None, &pipeline_ctx);
1063            assert!(rows.is_ok());
1064            let mut rows = rows.unwrap();
1065            assert!(rows.len() == 1);
1066            let rows = rows.remove(&ContextOpt::default()).unwrap();
1067            assert_eq!(rows.schema.len(), 8);
1068            assert_eq!(rows.rows.len(), 2);
1069            assert_eq!(8, rows.rows[0].values.len());
1070            assert_eq!(8, rows.rows[1].values.len());
1071        }
1072        {
1073            let array = [
1074                serde_json::json!({
1075                    "woshinull": null,
1076                    "name": "Alice",
1077                    "age": 20,
1078                    "is_student": true,
1079                    "score": 99.5,
1080                    "hobbies": "reading",
1081                    "address": "Beijing",
1082                }),
1083                serde_json::json!({
1084                    "name": "Bob",
1085                    "age": 21,
1086                    "is_student": false,
1087                    "score": 88.5,
1088                    "hobbies": "swimming",
1089                    "address": "Shanghai",
1090                    "gaga": "gaga"
1091                }),
1092            ];
1093            let tag_column_names = ["name".to_string(), "address".to_string()];
1094
1095            let rows = identity_pipeline_inner(
1096                array.iter().map(|v| v.into()).collect(),
1097                &pipeline_ctx,
1098                pipeline_ctx.pipeline_param.max_nested_levels(),
1099            )
1100            .map(|(mut schema, mut rows)| {
1101                for name in tag_column_names {
1102                    if let Some(index) = schema.index.get(&name) {
1103                        schema.schema[*index].semantic_type = SemanticType::Tag;
1104                    }
1105                }
1106
1107                assert!(rows.len() == 1);
1108                let rows = rows.remove(&ContextOpt::default()).unwrap();
1109
1110                Rows {
1111                    schema: schema.column_schemas().unwrap(),
1112                    rows,
1113                }
1114            });
1115
1116            assert!(rows.is_ok());
1117            let rows = rows.unwrap();
1118            assert_eq!(rows.schema.len(), 8);
1119            assert_eq!(rows.rows.len(), 2);
1120            assert_eq!(8, rows.rows[0].values.len());
1121            assert_eq!(8, rows.rows[1].values.len());
1122            assert_eq!(
1123                rows.schema
1124                    .iter()
1125                    .find(|x| x.column_name == "name")
1126                    .unwrap()
1127                    .semantic_type,
1128                SemanticType::Tag as i32
1129            );
1130            assert_eq!(
1131                rows.schema
1132                    .iter()
1133                    .find(|x| x.column_name == "address")
1134                    .unwrap()
1135                    .semantic_type,
1136                SemanticType::Tag as i32
1137            );
1138            assert_eq!(
1139                rows.schema
1140                    .iter()
1141                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
1142                    .count(),
1143                2
1144            );
1145        }
1146    }
1147
1148    #[test]
1149    fn test_flatten() {
1150        let test_cases = vec![
1151            // Basic case.
1152            (
1153                serde_json::json!(
1154                    {
1155                        "a": {
1156                            "b": {
1157                                "c": [1, 2, 3]
1158                            }
1159                        },
1160                        "d": [
1161                            "foo",
1162                            "bar"
1163                        ],
1164                        "e": {
1165                            "f": [7, 8, 9],
1166                            "g": {
1167                                "h": 123,
1168                                "i": "hello",
1169                                "j": {
1170                                    "k": true
1171                                }
1172                            }
1173                        }
1174                    }
1175                ),
1176                10,
1177                Some(serde_json::json!(
1178                    {
1179                        "a.b.c": "[1,2,3]",
1180                        "d": "[\"foo\",\"bar\"]",
1181                        "e.f": "[7,8,9]",
1182                        "e.g.h": 123,
1183                        "e.g.i": "hello",
1184                        "e.g.j.k": true
1185                    }
1186                )),
1187            ),
1188            // Test the case where the object has more than 3 nested levels.
1189            (
1190                serde_json::json!(
1191                    {
1192                        "a": {
1193                            "b": {
1194                                "c": {
1195                                    "d": [1, 2, 3]
1196                                }
1197                            }
1198                        },
1199                        "e": [
1200                            "foo",
1201                            "bar"
1202                        ]
1203                    }
1204                ),
1205                3,
1206                Some(serde_json::json!(
1207                    {
1208                        "a.b.c": "{\"d\":[1,2,3]}",
1209                        "e": "[\"foo\",\"bar\"]"
1210                    }
1211                )),
1212            ),
1213        ];
1214
1215        for (input, max_depth, expected) in test_cases {
1216            let input = input.into();
1217            let expected = expected.map(|e| e.into());
1218
1219            let flattened_object = flatten_object(input, max_depth).ok();
1220            assert_eq!(flattened_object, expected);
1221        }
1222    }
1223
1224    #[test]
1225    fn test_identity_pipeline_skip_error_flattens_valid_rows() {
1226        let params = GreptimePipelineParams::from_map(ahash::HashMap::from_iter([(
1227            "skip_error".to_string(),
1228            "true".to_string(),
1229        )]));
1230        let pipeline_def = PipelineDefinition::GreptimeIdentityPipeline(None);
1231        let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, Channel::Unknown);
1232        let array = vec![
1233            serde_json::json!({
1234                "service": "frontend",
1235                "nested": {
1236                    "status": 200,
1237                    "path": "/v1/ingest"
1238                },
1239                "labels": ["pipeline", "identity"]
1240            })
1241            .into(),
1242            VrlValue::Bytes("invalid_string".into()),
1243            serde_json::json!({
1244                "service": "frontend",
1245                "nested": {
1246                    "status": 201,
1247                    "path": "/v1/ingest"
1248                },
1249                "labels": ["pipeline", "identity"]
1250            })
1251            .into(),
1252        ];
1253
1254        let mut rows_by_opt = identity_pipeline(array, None, &pipeline_ctx).unwrap();
1255        let rows = rows_by_opt.remove(&ContextOpt::default()).unwrap();
1256
1257        assert_eq!(rows.rows.len(), 2);
1258        assert_eq!(rows.schema.len(), rows.rows[0].values.len());
1259        assert!(rows.schema.iter().any(|s| s.column_name == "nested.status"));
1260        assert!(rows.schema.iter().any(|s| s.column_name == "nested.path"));
1261        assert!(rows.schema.iter().any(|s| s.column_name == "labels"));
1262    }
1263
1264    use ahash::HashMap as AHashMap;
1265    #[test]
1266    fn test_values_to_rows_skip_error_handling() {
1267        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1268
1269        // Case 1: skip_error=true, mixed valid/invalid elements
1270        {
1271            let schema_info = &mut SchemaInfo::default();
1272            let input_array = vec![
1273                // Valid object
1274                serde_json::json!({"name": "Alice", "age": 25}).into(),
1275                // Invalid element (string)
1276                VrlValue::Bytes("invalid_string".into()),
1277                // Valid object
1278                serde_json::json!({"name": "Bob", "age": 30}).into(),
1279                // Invalid element (number)
1280                VrlValue::Integer(42),
1281                // Valid object
1282                serde_json::json!({"name": "Charlie", "age": 35}).into(),
1283            ];
1284
1285            let params = GreptimePipelineParams::from_map(AHashMap::from_iter([(
1286                "skip_error".to_string(),
1287                "true".to_string(),
1288            )]));
1289
1290            let pipeline_ctx = PipelineContext::new(
1291                &PipelineDefinition::GreptimeIdentityPipeline(None),
1292                &params,
1293                Channel::Unknown,
1294            );
1295
1296            let result = values_to_rows(
1297                schema_info,
1298                VrlValue::Array(input_array),
1299                &pipeline_ctx,
1300                None,
1301                true,
1302                table_suffix_template.as_ref(),
1303            );
1304
1305            // Should succeed and only process valid objects
1306            assert!(result.is_ok());
1307            let rows_by_context = result.unwrap();
1308            // Count total rows across all ContextOpt groups
1309            let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1310            assert_eq!(total_rows, 3); // Only 3 valid objects
1311        }
1312
1313        // Case 2: skip_error=false, invalid elements present
1314        {
1315            let schema_info = &mut SchemaInfo::default();
1316            let input_array = vec![
1317                serde_json::json!({"name": "Alice", "age": 25}).into(),
1318                VrlValue::Bytes("invalid_string".into()), // This should cause error
1319            ];
1320
1321            let params = GreptimePipelineParams::default(); // skip_error = false
1322
1323            let pipeline_ctx = PipelineContext::new(
1324                &PipelineDefinition::GreptimeIdentityPipeline(None),
1325                &params,
1326                Channel::Unknown,
1327            );
1328
1329            let result = values_to_rows(
1330                schema_info,
1331                VrlValue::Array(input_array),
1332                &pipeline_ctx,
1333                None,
1334                true,
1335                table_suffix_template.as_ref(),
1336            );
1337
1338            // Should fail with ArrayElementMustBeObject error
1339            assert!(result.is_err());
1340            let error_msg = result.unwrap_err().to_string();
1341            assert!(error_msg.contains("Array element at index 1 must be an object for one-to-many transformation, got string"));
1342        }
1343    }
1344
1345    /// Test that values_to_rows correctly groups rows by per-element ContextOpt
1346    #[test]
1347    fn test_values_to_rows_per_element_context_opt() {
1348        let table_suffix_template: Option<crate::tablesuffix::TableSuffixTemplate> = None;
1349        let schema_info = &mut SchemaInfo::default();
1350
1351        // Create array with elements having different TTL values (ContextOpt)
1352        let input_array = vec![
1353            serde_json::json!({"name": "Alice", "greptime_ttl": "1h"}).into(),
1354            serde_json::json!({"name": "Bob", "greptime_ttl": "1h"}).into(),
1355            serde_json::json!({"name": "Charlie", "greptime_ttl": "24h"}).into(),
1356        ];
1357
1358        let params = GreptimePipelineParams::default();
1359        let pipeline_ctx = PipelineContext::new(
1360            &PipelineDefinition::GreptimeIdentityPipeline(None),
1361            &params,
1362            Channel::Unknown,
1363        );
1364
1365        let result = values_to_rows(
1366            schema_info,
1367            VrlValue::Array(input_array),
1368            &pipeline_ctx,
1369            None,
1370            true,
1371            table_suffix_template.as_ref(),
1372        );
1373
1374        assert!(result.is_ok());
1375        let rows_by_context = result.unwrap();
1376
1377        // Should have 2 different ContextOpt groups (1h TTL and 24h TTL)
1378        assert_eq!(rows_by_context.len(), 2);
1379
1380        // Count rows per group
1381        let total_rows: usize = rows_by_context.values().map(|v| v.len()).sum();
1382        assert_eq!(total_rows, 3);
1383
1384        // Verify that rows are correctly grouped by TTL
1385        let mut ttl_1h_count = 0;
1386        let mut ttl_24h_count = 0;
1387        for rows in rows_by_context.values() {
1388            // ContextOpt doesn't expose ttl directly, but we can count by group size
1389            if rows.len() == 2 {
1390                ttl_1h_count = rows.len();
1391            } else if rows.len() == 1 {
1392                ttl_24h_count = rows.len();
1393            }
1394        }
1395        assert_eq!(ttl_1h_count, 2); // Alice and Bob with 1h TTL
1396        assert_eq!(ttl_24h_count, 1); // Charlie with 24h TTL
1397    }
1398}