pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
41    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53/// fields not in the columns will be discarded
54/// to prevent automatic column creation in GreptimeDB
55#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57    transforms: Transforms,
58    schema: Vec<ColumnSchema>,
59}
60
61/// Parameters that can be used to configure the greptime pipelines.
62#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64    /// The original options for configuring the greptime pipelines.
65    /// This should not be used directly, instead, use the parsed shortcut option values.
66    options: HashMap<String, String>,
67
68    /// Whether to skip error when processing the pipeline.
69    pub skip_error: OnceCell<bool>,
70    /// Max nested levels when flattening JSON object. Defaults to
71    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
72    pub max_nested_levels: OnceCell<usize>,
73}
74
75impl GreptimePipelineParams {
76    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
77    /// The params is in the format of `key1=value1&key2=value2`,for example:
78    /// x-greptime-pipeline-params: max_nested_levels=5
79    pub fn from_params(params: Option<&str>) -> Self {
80        let options = Self::parse_header_str_to_map(params);
81
82        Self {
83            options,
84            skip_error: OnceCell::new(),
85            max_nested_levels: OnceCell::new(),
86        }
87    }
88
89    pub fn from_map(options: HashMap<String, String>) -> Self {
90        Self {
91            options,
92            skip_error: OnceCell::new(),
93            max_nested_levels: OnceCell::new(),
94        }
95    }
96
97    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
98        if let Some(params) = params {
99            if params.is_empty() {
100                HashMap::new()
101            } else {
102                params
103                    .split('&')
104                    .filter_map(|s| s.split_once('='))
105                    .map(|(k, v)| (k.to_string(), v.to_string()))
106                    .collect::<HashMap<String, String>>()
107            }
108        } else {
109            HashMap::new()
110        }
111    }
112
113    /// Whether to skip error when processing the pipeline.
114    pub fn skip_error(&self) -> bool {
115        *self
116            .skip_error
117            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
118    }
119
120    /// Max nested levels for JSON flattening. If not provided or invalid,
121    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
122    pub fn max_nested_levels(&self) -> usize {
123        *self.max_nested_levels.get_or_init(|| {
124            self.options
125                .get("max_nested_levels")
126                .and_then(|s| s.parse::<usize>().ok())
127                .filter(|v| *v > 0)
128                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
129        })
130    }
131}
132
133impl GreptimeTransformer {
134    /// Add a default timestamp column to the transforms
135    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
136        let type_ = ColumnDataType::TimestampNanosecond;
137        let default = None;
138
139        let transform = Transform {
140            fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
141            type_,
142            default,
143            index: Some(Index::Time),
144            on_failure: Some(crate::etl::transform::OnFailure::Default),
145            tag: false,
146        };
147        transforms.push(transform);
148    }
149
150    /// Generate the schema for the GreptimeTransformer
151    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152        let mut schema = vec![];
153        for transform in transforms.iter() {
154            schema.extend(coerce_columns(transform)?);
155        }
156        Ok(schema)
157    }
158}
159
160impl GreptimeTransformer {
161    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162        // empty check is done in the caller
163        let mut column_names_set = HashSet::new();
164        let mut timestamp_columns = vec![];
165
166        for transform in transforms.iter() {
167            let target_fields_set = transform
168                .fields
169                .iter()
170                .map(|f| f.target_or_input_field())
171                .collect::<HashSet<_>>();
172
173            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174            if !intersections.is_empty() {
175                let duplicates = intersections.iter().join(",");
176                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177            }
178
179            column_names_set.extend(target_fields_set);
180
181            if let Some(idx) = transform.index
182                && idx == Index::Time
183            {
184                match transform.fields.len() {
185                    //Safety unwrap is fine here because we have checked the length of real_fields
186                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
187                    _ => {
188                        return TransformMultipleTimestampIndexSnafu {
189                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
190                        }
191                        .fail();
192                    }
193                }
194            }
195        }
196
197        let schema = match timestamp_columns.len() {
198            0 if doc_version == &PipelineDocVersion::V1 => {
199                // compatible with v1, add a default timestamp column
200                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
201                GreptimeTransformer::init_schemas(&transforms)?
202            }
203            1 => GreptimeTransformer::init_schemas(&transforms)?,
204            count => {
205                let columns = timestamp_columns.iter().join(", ");
206                return TransformTimestampIndexCountSnafu { count, columns }.fail();
207            }
208        };
209        Ok(GreptimeTransformer { transforms, schema })
210    }
211
212    pub fn transform_mut(
213        &self,
214        pipeline_map: &mut VrlValue,
215        is_v1: bool,
216    ) -> Result<Vec<GreptimeValue>> {
217        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
218        let mut output_index = 0;
219        for transform in self.transforms.iter() {
220            for field in transform.fields.iter() {
221                let column_name = field.input_field();
222
223                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
224                // let keep us `get` here to be compatible with v1
225                match pipeline_map.get(column_name) {
226                    Some(v) => {
227                        let value_data = coerce_value(v, transform)?;
228                        // every transform fields has only one output field
229                        values[output_index] = GreptimeValue { value_data };
230                    }
231                    None => {
232                        let value_data = match transform.on_failure {
233                            Some(crate::etl::transform::OnFailure::Default) => {
234                                match transform.get_default() {
235                                    Some(default) => Some(default.clone()),
236                                    None => transform.get_default_value_when_data_is_none(),
237                                }
238                            }
239                            Some(crate::etl::transform::OnFailure::Ignore) => None,
240                            None => None,
241                        };
242                        if transform.is_timeindex() && value_data.is_none() {
243                            return TimeIndexMustBeNonNullSnafu.fail();
244                        }
245                        values[output_index] = GreptimeValue { value_data };
246                    }
247                }
248                output_index += 1;
249                if !is_v1 {
250                    // remove the column from the pipeline_map
251                    // so that the auto-transform can use the rest fields
252                    pipeline_map.remove(column_name);
253                }
254            }
255        }
256        Ok(values)
257    }
258
259    pub fn transforms(&self) -> &Transforms {
260        &self.transforms
261    }
262
263    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
264        &self.schema
265    }
266
267    pub fn transforms_mut(&mut self) -> &mut Transforms {
268        &mut self.transforms
269    }
270}
271
272/// This is used to record the current state schema information and a sequential cache of field names.
273/// As you traverse the user input JSON, this will change.
274/// It will record a superset of all user input schemas.
275#[derive(Debug, Default)]
276pub struct SchemaInfo {
277    /// schema info
278    pub schema: Vec<ColumnSchema>,
279    /// index of the column name
280    pub index: HashMap<String, usize>,
281}
282
283impl SchemaInfo {
284    pub fn with_capacity(capacity: usize) -> Self {
285        Self {
286            schema: Vec::with_capacity(capacity),
287            index: HashMap::with_capacity(capacity),
288        }
289    }
290
291    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
292        let mut index = HashMap::new();
293        for (i, schema) in schema_list.iter().enumerate() {
294            index.insert(schema.column_name.clone(), i);
295        }
296        Self {
297            schema: schema_list,
298            index,
299        }
300    }
301}
302
303fn resolve_schema(
304    index: Option<usize>,
305    value_data: ValueData,
306    column_schema: ColumnSchema,
307    row: &mut Vec<GreptimeValue>,
308    schema_info: &mut SchemaInfo,
309) -> Result<()> {
310    if let Some(index) = index {
311        let api_value = GreptimeValue {
312            value_data: Some(value_data),
313        };
314        // Safety unwrap is fine here because api_value is always valid
315        let value_column_data_type = proto_value_type(&api_value).unwrap();
316        // Safety unwrap is fine here because index is always valid
317        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
318        if value_column_data_type != schema_column_data_type {
319            IdentifyPipelineColumnTypeMismatchSnafu {
320                column: column_schema.column_name,
321                expected: schema_column_data_type.as_str_name(),
322                actual: value_column_data_type.as_str_name(),
323            }
324            .fail()
325        } else {
326            row[index] = api_value;
327            Ok(())
328        }
329    } else {
330        let key = column_schema.column_name.clone();
331        schema_info.schema.push(column_schema);
332        schema_info.index.insert(key, schema_info.schema.len() - 1);
333        let api_value = GreptimeValue {
334            value_data: Some(value_data),
335        };
336        row.push(api_value);
337        Ok(())
338    }
339}
340
341fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
342    match p_ctx.channel {
343        Channel::Prometheus => {
344            let ts = values
345                .as_object()
346                .and_then(|m| m.get(greptime_timestamp()))
347                .and_then(|ts| ts.try_into_i64().ok())
348                .unwrap_or_default();
349            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
350        }
351        _ => {
352            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
353            match custom_ts {
354                Some(ts) => {
355                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
356                    Some(ts.get_timestamp_value(ts_field)).transpose()
357                }
358                None => Ok(Some(ValueData::TimestampNanosecondValue(
359                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
360                ))),
361            }
362        }
363    }
364}
365
366/// `need_calc_ts` happens in two cases:
367/// 1. full greptime_identity
368/// 2. auto-transform without transformer
369///
370/// if transform is present in custom pipeline in v2 mode
371/// we dont need to calc ts again, nor do we need to check ts column name
372pub(crate) fn values_to_row(
373    schema_info: &mut SchemaInfo,
374    values: VrlValue,
375    pipeline_ctx: &PipelineContext<'_>,
376    row: Option<Vec<GreptimeValue>>,
377    need_calc_ts: bool,
378) -> Result<Row> {
379    let mut row: Vec<GreptimeValue> =
380        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
381    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
382
383    if need_calc_ts {
384        // calculate timestamp value based on the channel
385        let ts = calc_ts(pipeline_ctx, &values)?;
386        row.push(GreptimeValue { value_data: ts });
387    }
388
389    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
390
391    // skip ts column
392    let ts_column_name = custom_ts
393        .as_ref()
394        .map_or(greptime_timestamp(), |ts| ts.get_column_name());
395
396    let values = values.into_object().context(ValueMustBeMapSnafu)?;
397
398    for (column_name, value) in values {
399        if need_calc_ts && column_name.as_str() == ts_column_name {
400            continue;
401        }
402
403        resolve_value(
404            value,
405            column_name.into(),
406            &mut row,
407            schema_info,
408            pipeline_ctx,
409        )?;
410    }
411    Ok(Row { values: row })
412}
413
414fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
415    if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
416        SemanticType::Tag as i32
417    } else {
418        SemanticType::Field as i32
419    }
420}
421
422fn resolve_value(
423    value: VrlValue,
424    column_name: String,
425    row: &mut Vec<GreptimeValue>,
426    schema_info: &mut SchemaInfo,
427    p_ctx: &PipelineContext,
428) -> Result<()> {
429    let index = schema_info.index.get(&column_name).copied();
430    let mut resolve_simple_type =
431        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
432            let semantic_type = decide_semantic(p_ctx, &column_name);
433            resolve_schema(
434                index,
435                value_data,
436                ColumnSchema {
437                    column_name,
438                    datatype: data_type as i32,
439                    semantic_type,
440                    datatype_extension: None,
441                    options: None,
442                },
443                row,
444                schema_info,
445            )
446        };
447
448    match value {
449        VrlValue::Null => {}
450
451        VrlValue::Integer(v) => {
452            // safe unwrap after type matched
453            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
454        }
455
456        VrlValue::Float(v) => {
457            // safe unwrap after type matched
458            resolve_simple_type(
459                ValueData::F64Value(v.into()),
460                column_name,
461                ColumnDataType::Float64,
462            )?;
463        }
464
465        VrlValue::Boolean(v) => {
466            resolve_simple_type(
467                ValueData::BoolValue(v),
468                column_name,
469                ColumnDataType::Boolean,
470            )?;
471        }
472
473        VrlValue::Bytes(v) => {
474            resolve_simple_type(
475                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
476                column_name,
477                ColumnDataType::String,
478            )?;
479        }
480
481        VrlValue::Regex(v) => {
482            warn!(
483                "Persisting regex value in the table, this should not happen, column_name: {}",
484                column_name
485            );
486            resolve_simple_type(
487                ValueData::StringValue(v.to_string()),
488                column_name,
489                ColumnDataType::String,
490            )?;
491        }
492
493        VrlValue::Timestamp(ts) => {
494            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
495                input: ts.to_rfc3339(),
496            })?;
497            resolve_simple_type(
498                ValueData::TimestampNanosecondValue(ns),
499                column_name,
500                ColumnDataType::TimestampNanosecond,
501            )?;
502        }
503
504        VrlValue::Array(_) | VrlValue::Object(_) => {
505            let data = vrl_value_to_jsonb_value(&value);
506            resolve_schema(
507                index,
508                ValueData::BinaryValue(data.to_vec()),
509                ColumnSchema {
510                    column_name,
511                    datatype: ColumnDataType::Binary as i32,
512                    semantic_type: SemanticType::Field as i32,
513                    datatype_extension: Some(ColumnDataTypeExtension {
514                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
515                    }),
516                    options: None,
517                },
518                row,
519                schema_info,
520            )?;
521        }
522    }
523    Ok(())
524}
525
526fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
527    match value {
528        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
529        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
530        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
531        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
532        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
533        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
534        VrlValue::Object(btree_map) => jsonb::Value::Object(
535            btree_map
536                .iter()
537                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
538                .collect(),
539        ),
540        VrlValue::Array(values) => jsonb::Value::Array(
541            values
542                .iter()
543                .map(|value| vrl_value_to_jsonb_value(value))
544                .collect(),
545        ),
546        VrlValue::Null => jsonb::Value::Null,
547    }
548}
549
550fn identity_pipeline_inner(
551    pipeline_maps: Vec<VrlValue>,
552    pipeline_ctx: &PipelineContext<'_>,
553) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
554    let skip_error = pipeline_ctx.pipeline_param.skip_error();
555    let mut schema_info = SchemaInfo::default();
556    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
557
558    // set time index column schema first
559    schema_info.schema.push(ColumnSchema {
560        column_name: custom_ts
561            .map(|ts| ts.get_column_name().to_string())
562            .unwrap_or_else(|| greptime_timestamp().to_string()),
563        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
564            if pipeline_ctx.channel == Channel::Prometheus {
565                ColumnDataType::TimestampMillisecond
566            } else {
567                ColumnDataType::TimestampNanosecond
568            }
569        }) as i32,
570        semantic_type: SemanticType::Timestamp as i32,
571        datatype_extension: None,
572        options: None,
573    });
574
575    let mut opt_map = HashMap::new();
576    let len = pipeline_maps.len();
577
578    for mut pipeline_map in pipeline_maps {
579        let opt = unwrap_or_continue_if_err!(
580            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
581            skip_error
582        );
583        let row = unwrap_or_continue_if_err!(
584            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
585            skip_error
586        );
587
588        opt_map
589            .entry(opt)
590            .or_insert_with(|| Vec::with_capacity(len))
591            .push(row);
592    }
593
594    let column_count = schema_info.schema.len();
595    for (_, row) in opt_map.iter_mut() {
596        for row in row.iter_mut() {
597            let diff = column_count - row.values.len();
598            for _ in 0..diff {
599                row.values.push(GreptimeValue { value_data: None });
600            }
601        }
602    }
603
604    Ok((schema_info, opt_map))
605}
606
607/// Identity pipeline for Greptime
608/// This pipeline will convert the input JSON array to Greptime Rows
609/// params table is used to set the semantic type of the row key column to Tag
610/// 1. The pipeline will add a default timestamp column to the schema
611/// 2. The pipeline not resolve NULL value
612/// 3. The pipeline assumes that the json format is fixed
613/// 4. The pipeline will return an error if the same column datatype is mismatched
614/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
615pub fn identity_pipeline(
616    array: Vec<VrlValue>,
617    table: Option<Arc<table::Table>>,
618    pipeline_ctx: &PipelineContext<'_>,
619) -> Result<HashMap<ContextOpt, Rows>> {
620    let skip_error = pipeline_ctx.pipeline_param.skip_error();
621    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
622    // Always flatten JSON objects and stringify arrays
623    let mut input = Vec::with_capacity(array.len());
624    for item in array.into_iter() {
625        let result =
626            unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
627        input.push(result);
628    }
629
630    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
631        if let Some(table) = table {
632            let table_info = table.table_info();
633            for tag_name in table_info.meta.row_key_column_names() {
634                if let Some(index) = schema.index.get(tag_name) {
635                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
636                }
637            }
638        }
639
640        opt_map
641            .into_iter()
642            .map(|(opt, rows)| {
643                (
644                    opt,
645                    Rows {
646                        schema: schema.schema.clone(),
647                        rows,
648                    },
649                )
650            })
651            .collect::<HashMap<ContextOpt, Rows>>()
652    })
653}
654
655/// Consumes the JSON object and consumes it into a single-level object.
656///
657/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
658/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
659/// string and stored at the current flattened key.
660pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
661    let mut flattened = BTreeMap::new();
662    let object = object.into_object().context(ValueMustBeMapSnafu)?;
663
664    if !object.is_empty() {
665        // it will use recursion to flatten the object.
666        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
667    }
668
669    Ok(VrlValue::Object(flattened))
670}
671
672fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
673    match value {
674        VrlValue::Null => serde_json_crate::Value::Null,
675        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
676        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
677        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
678            .map(serde_json_crate::Value::Number)
679            .unwrap_or(serde_json_crate::Value::Null),
680        VrlValue::Bytes(bytes) => {
681            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
682        }
683        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
684        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
685        VrlValue::Array(arr) => {
686            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
687        }
688        VrlValue::Object(map) => serde_json_crate::Value::Object(
689            map.iter()
690                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
691                .collect(),
692        ),
693    }
694}
695
696fn do_flatten_object(
697    dest: &mut BTreeMap<KeyString, VrlValue>,
698    base: Option<&str>,
699    object: BTreeMap<KeyString, VrlValue>,
700    current_level: usize,
701    max_nested_levels: usize,
702) {
703    for (key, value) in object {
704        let new_key = base.map_or_else(
705            || key.clone(),
706            |base_key| format!("{base_key}.{key}").into(),
707        );
708
709        match value {
710            VrlValue::Object(object) => {
711                if current_level >= max_nested_levels {
712                    // Reached the maximum level; stringify the remaining object.
713                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
714                        &VrlValue::Object(object),
715                    ))
716                    .unwrap_or_else(|_| String::from("{}"));
717                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
718                } else {
719                    do_flatten_object(
720                        dest,
721                        Some(&new_key),
722                        object,
723                        current_level + 1,
724                        max_nested_levels,
725                    );
726                }
727            }
728            // Arrays are stringified to ensure no JSON column types in the result.
729            VrlValue::Array(_) => {
730                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
731                    .unwrap_or_else(|_| String::from("[]"));
732                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
733            }
734            // Other leaf types are inserted as-is.
735            _ => {
736                dest.insert(new_key, value);
737            }
738        }
739    }
740}
741
742#[cfg(test)]
743mod tests {
744    use api::v1::SemanticType;
745
746    use super::*;
747    use crate::{PipelineDefinition, identity_pipeline};
748
749    #[test]
750    fn test_identify_pipeline() {
751        let params = GreptimePipelineParams::default();
752        let pipeline_ctx = PipelineContext::new(
753            &PipelineDefinition::GreptimeIdentityPipeline(None),
754            &params,
755            Channel::Unknown,
756        );
757        {
758            let array = [
759                serde_json::json!({
760                    "woshinull": null,
761                    "name": "Alice",
762                    "age": 20,
763                    "is_student": true,
764                    "score": 99.5,
765                    "hobbies": "reading",
766                    "address": "Beijing",
767                }),
768                serde_json::json!({
769                    "name": "Bob",
770                    "age": 21,
771                    "is_student": false,
772                    "score": "88.5",
773                    "hobbies": "swimming",
774                    "address": "Shanghai",
775                    "gaga": "gaga"
776                }),
777            ];
778            let array = array.iter().map(|v| v.into()).collect();
779            let rows = identity_pipeline(array, None, &pipeline_ctx);
780            assert!(rows.is_err());
781            assert_eq!(
782                rows.err().unwrap().to_string(),
783                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
784            );
785        }
786        {
787            let array = [
788                serde_json::json!({
789                    "woshinull": null,
790                    "name": "Alice",
791                    "age": 20,
792                    "is_student": true,
793                    "score": 99.5,
794                    "hobbies": "reading",
795                    "address": "Beijing",
796                }),
797                serde_json::json!({
798                    "name": "Bob",
799                    "age": 21,
800                    "is_student": false,
801                    "score": 88,
802                    "hobbies": "swimming",
803                    "address": "Shanghai",
804                    "gaga": "gaga"
805                }),
806            ];
807            let array = array.iter().map(|v| v.into()).collect();
808            let rows = identity_pipeline(array, None, &pipeline_ctx);
809            assert!(rows.is_err());
810            assert_eq!(
811                rows.err().unwrap().to_string(),
812                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
813            );
814        }
815        {
816            let array = [
817                serde_json::json!({
818                    "woshinull": null,
819                    "name": "Alice",
820                    "age": 20,
821                    "is_student": true,
822                    "score": 99.5,
823                    "hobbies": "reading",
824                    "address": "Beijing",
825                }),
826                serde_json::json!({
827                    "name": "Bob",
828                    "age": 21,
829                    "is_student": false,
830                    "score": 88.5,
831                    "hobbies": "swimming",
832                    "address": "Shanghai",
833                    "gaga": "gaga"
834                }),
835            ];
836            let array = array.iter().map(|v| v.into()).collect();
837            let rows = identity_pipeline(array, None, &pipeline_ctx);
838            assert!(rows.is_ok());
839            let mut rows = rows.unwrap();
840            assert!(rows.len() == 1);
841            let rows = rows.remove(&ContextOpt::default()).unwrap();
842            assert_eq!(rows.schema.len(), 8);
843            assert_eq!(rows.rows.len(), 2);
844            assert_eq!(8, rows.rows[0].values.len());
845            assert_eq!(8, rows.rows[1].values.len());
846        }
847        {
848            let array = [
849                serde_json::json!({
850                    "woshinull": null,
851                    "name": "Alice",
852                    "age": 20,
853                    "is_student": true,
854                    "score": 99.5,
855                    "hobbies": "reading",
856                    "address": "Beijing",
857                }),
858                serde_json::json!({
859                    "name": "Bob",
860                    "age": 21,
861                    "is_student": false,
862                    "score": 88.5,
863                    "hobbies": "swimming",
864                    "address": "Shanghai",
865                    "gaga": "gaga"
866                }),
867            ];
868            let tag_column_names = ["name".to_string(), "address".to_string()];
869
870            let rows =
871                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
872                    .map(|(mut schema, mut rows)| {
873                        for name in tag_column_names {
874                            if let Some(index) = schema.index.get(&name) {
875                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
876                            }
877                        }
878
879                        assert!(rows.len() == 1);
880                        let rows = rows.remove(&ContextOpt::default()).unwrap();
881
882                        Rows {
883                            schema: schema.schema,
884                            rows,
885                        }
886                    });
887
888            assert!(rows.is_ok());
889            let rows = rows.unwrap();
890            assert_eq!(rows.schema.len(), 8);
891            assert_eq!(rows.rows.len(), 2);
892            assert_eq!(8, rows.rows[0].values.len());
893            assert_eq!(8, rows.rows[1].values.len());
894            assert_eq!(
895                rows.schema
896                    .iter()
897                    .find(|x| x.column_name == "name")
898                    .unwrap()
899                    .semantic_type,
900                SemanticType::Tag as i32
901            );
902            assert_eq!(
903                rows.schema
904                    .iter()
905                    .find(|x| x.column_name == "address")
906                    .unwrap()
907                    .semantic_type,
908                SemanticType::Tag as i32
909            );
910            assert_eq!(
911                rows.schema
912                    .iter()
913                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
914                    .count(),
915                2
916            );
917        }
918    }
919
920    #[test]
921    fn test_flatten() {
922        let test_cases = vec![
923            // Basic case.
924            (
925                serde_json::json!(
926                    {
927                        "a": {
928                            "b": {
929                                "c": [1, 2, 3]
930                            }
931                        },
932                        "d": [
933                            "foo",
934                            "bar"
935                        ],
936                        "e": {
937                            "f": [7, 8, 9],
938                            "g": {
939                                "h": 123,
940                                "i": "hello",
941                                "j": {
942                                    "k": true
943                                }
944                            }
945                        }
946                    }
947                ),
948                10,
949                Some(serde_json::json!(
950                    {
951                        "a.b.c": "[1,2,3]",
952                        "d": "[\"foo\",\"bar\"]",
953                        "e.f": "[7,8,9]",
954                        "e.g.h": 123,
955                        "e.g.i": "hello",
956                        "e.g.j.k": true
957                    }
958                )),
959            ),
960            // Test the case where the object has more than 3 nested levels.
961            (
962                serde_json::json!(
963                    {
964                        "a": {
965                            "b": {
966                                "c": {
967                                    "d": [1, 2, 3]
968                                }
969                            }
970                        },
971                        "e": [
972                            "foo",
973                            "bar"
974                        ]
975                    }
976                ),
977                3,
978                Some(serde_json::json!(
979                    {
980                        "a.b.c": "{\"d\":[1,2,3]}",
981                        "e": "[\"foo\",\"bar\"]"
982                    }
983                )),
984            ),
985        ];
986
987        for (input, max_depth, expected) in test_cases {
988            let input = input.into();
989            let expected = expected.map(|e| e.into());
990
991            let flattened_object = flatten_object(input, max_depth).ok();
992            assert_eq!(flattened_object, expected);
993        }
994    }
995}