pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
41    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
52const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
53
54/// fields not in the columns will be discarded
55/// to prevent automatic column creation in GreptimeDB
56#[derive(Debug, Clone)]
57pub struct GreptimeTransformer {
58    transforms: Transforms,
59    schema: Vec<ColumnSchema>,
60}
61
62/// Parameters that can be used to configure the greptime pipelines.
63#[derive(Debug, Default)]
64pub struct GreptimePipelineParams {
65    /// The original options for configuring the greptime pipelines.
66    /// This should not be used directly, instead, use the parsed shortcut option values.
67    options: HashMap<String, String>,
68
69    /// Whether to skip error when processing the pipeline.
70    pub skip_error: OnceCell<bool>,
71    /// Max nested levels when flattening JSON object. Defaults to
72    /// `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING` when not provided.
73    pub max_nested_levels: OnceCell<usize>,
74}
75
76impl GreptimePipelineParams {
77    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
78    /// The params is in the format of `key1=value1&key2=value2`,for example:
79    /// x-greptime-pipeline-params: max_nested_levels=5
80    pub fn from_params(params: Option<&str>) -> Self {
81        let options = Self::parse_header_str_to_map(params);
82
83        Self {
84            options,
85            skip_error: OnceCell::new(),
86            max_nested_levels: OnceCell::new(),
87        }
88    }
89
90    pub fn from_map(options: HashMap<String, String>) -> Self {
91        Self {
92            options,
93            skip_error: OnceCell::new(),
94            max_nested_levels: OnceCell::new(),
95        }
96    }
97
98    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
99        if let Some(params) = params {
100            if params.is_empty() {
101                HashMap::new()
102            } else {
103                params
104                    .split('&')
105                    .filter_map(|s| s.split_once('='))
106                    .map(|(k, v)| (k.to_string(), v.to_string()))
107                    .collect::<HashMap<String, String>>()
108            }
109        } else {
110            HashMap::new()
111        }
112    }
113
114    /// Whether to skip error when processing the pipeline.
115    pub fn skip_error(&self) -> bool {
116        *self
117            .skip_error
118            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
119    }
120
121    /// Max nested levels for JSON flattening. If not provided or invalid,
122    /// falls back to `DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING`.
123    pub fn max_nested_levels(&self) -> usize {
124        *self.max_nested_levels.get_or_init(|| {
125            self.options
126                .get("max_nested_levels")
127                .and_then(|s| s.parse::<usize>().ok())
128                .filter(|v| *v > 0)
129                .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
130        })
131    }
132}
133
134impl GreptimeTransformer {
135    /// Add a default timestamp column to the transforms
136    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
137        let type_ = ColumnDataType::TimestampNanosecond;
138        let default = None;
139
140        let transform = Transform {
141            fields: Fields::one(Field::new(
142                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
143                None,
144            )),
145            type_,
146            default,
147            index: Some(Index::Time),
148            on_failure: Some(crate::etl::transform::OnFailure::Default),
149            tag: false,
150        };
151        transforms.push(transform);
152    }
153
154    /// Generate the schema for the GreptimeTransformer
155    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
156        let mut schema = vec![];
157        for transform in transforms.iter() {
158            schema.extend(coerce_columns(transform)?);
159        }
160        Ok(schema)
161    }
162}
163
164impl GreptimeTransformer {
165    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
166        // empty check is done in the caller
167        let mut column_names_set = HashSet::new();
168        let mut timestamp_columns = vec![];
169
170        for transform in transforms.iter() {
171            let target_fields_set = transform
172                .fields
173                .iter()
174                .map(|f| f.target_or_input_field())
175                .collect::<HashSet<_>>();
176
177            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
178            if !intersections.is_empty() {
179                let duplicates = intersections.iter().join(",");
180                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
181            }
182
183            column_names_set.extend(target_fields_set);
184
185            if let Some(idx) = transform.index
186                && idx == Index::Time
187            {
188                match transform.fields.len() {
189                    //Safety unwrap is fine here because we have checked the length of real_fields
190                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
191                    _ => {
192                        return TransformMultipleTimestampIndexSnafu {
193                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
194                        }
195                        .fail();
196                    }
197                }
198            }
199        }
200
201        let schema = match timestamp_columns.len() {
202            0 if doc_version == &PipelineDocVersion::V1 => {
203                // compatible with v1, add a default timestamp column
204                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
205                GreptimeTransformer::init_schemas(&transforms)?
206            }
207            1 => GreptimeTransformer::init_schemas(&transforms)?,
208            count => {
209                let columns = timestamp_columns.iter().join(", ");
210                return TransformTimestampIndexCountSnafu { count, columns }.fail();
211            }
212        };
213        Ok(GreptimeTransformer { transforms, schema })
214    }
215
216    pub fn transform_mut(
217        &self,
218        pipeline_map: &mut VrlValue,
219        is_v1: bool,
220    ) -> Result<Vec<GreptimeValue>> {
221        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
222        let mut output_index = 0;
223        for transform in self.transforms.iter() {
224            for field in transform.fields.iter() {
225                let column_name = field.input_field();
226
227                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
228                // let keep us `get` here to be compatible with v1
229                match pipeline_map.get(column_name) {
230                    Some(v) => {
231                        let value_data = coerce_value(v, transform)?;
232                        // every transform fields has only one output field
233                        values[output_index] = GreptimeValue { value_data };
234                    }
235                    None => {
236                        let value_data = match transform.on_failure {
237                            Some(crate::etl::transform::OnFailure::Default) => {
238                                match transform.get_default() {
239                                    Some(default) => Some(default.clone()),
240                                    None => transform.get_default_value_when_data_is_none(),
241                                }
242                            }
243                            Some(crate::etl::transform::OnFailure::Ignore) => None,
244                            None => None,
245                        };
246                        if transform.is_timeindex() && value_data.is_none() {
247                            return TimeIndexMustBeNonNullSnafu.fail();
248                        }
249                        values[output_index] = GreptimeValue { value_data };
250                    }
251                }
252                output_index += 1;
253                if !is_v1 {
254                    // remove the column from the pipeline_map
255                    // so that the auto-transform can use the rest fields
256                    pipeline_map.remove(column_name);
257                }
258            }
259        }
260        Ok(values)
261    }
262
263    pub fn transforms(&self) -> &Transforms {
264        &self.transforms
265    }
266
267    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
268        &self.schema
269    }
270
271    pub fn transforms_mut(&mut self) -> &mut Transforms {
272        &mut self.transforms
273    }
274}
275
276/// This is used to record the current state schema information and a sequential cache of field names.
277/// As you traverse the user input JSON, this will change.
278/// It will record a superset of all user input schemas.
279#[derive(Debug, Default)]
280pub struct SchemaInfo {
281    /// schema info
282    pub schema: Vec<ColumnSchema>,
283    /// index of the column name
284    pub index: HashMap<String, usize>,
285}
286
287impl SchemaInfo {
288    pub fn with_capacity(capacity: usize) -> Self {
289        Self {
290            schema: Vec::with_capacity(capacity),
291            index: HashMap::with_capacity(capacity),
292        }
293    }
294
295    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
296        let mut index = HashMap::new();
297        for (i, schema) in schema_list.iter().enumerate() {
298            index.insert(schema.column_name.clone(), i);
299        }
300        Self {
301            schema: schema_list,
302            index,
303        }
304    }
305}
306
307fn resolve_schema(
308    index: Option<usize>,
309    value_data: ValueData,
310    column_schema: ColumnSchema,
311    row: &mut Vec<GreptimeValue>,
312    schema_info: &mut SchemaInfo,
313) -> Result<()> {
314    if let Some(index) = index {
315        let api_value = GreptimeValue {
316            value_data: Some(value_data),
317        };
318        // Safety unwrap is fine here because api_value is always valid
319        let value_column_data_type = proto_value_type(&api_value).unwrap();
320        // Safety unwrap is fine here because index is always valid
321        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
322        if value_column_data_type != schema_column_data_type {
323            IdentifyPipelineColumnTypeMismatchSnafu {
324                column: column_schema.column_name,
325                expected: schema_column_data_type.as_str_name(),
326                actual: value_column_data_type.as_str_name(),
327            }
328            .fail()
329        } else {
330            row[index] = api_value;
331            Ok(())
332        }
333    } else {
334        let key = column_schema.column_name.clone();
335        schema_info.schema.push(column_schema);
336        schema_info.index.insert(key, schema_info.schema.len() - 1);
337        let api_value = GreptimeValue {
338            value_data: Some(value_data),
339        };
340        row.push(api_value);
341        Ok(())
342    }
343}
344
345fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
346    match p_ctx.channel {
347        Channel::Prometheus => {
348            let ts = values
349                .as_object()
350                .and_then(|m| m.get(GREPTIME_TIMESTAMP))
351                .and_then(|ts| ts.try_into_i64().ok())
352                .unwrap_or_default();
353            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
354        }
355        _ => {
356            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
357            match custom_ts {
358                Some(ts) => {
359                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
360                    Some(ts.get_timestamp_value(ts_field)).transpose()
361                }
362                None => Ok(Some(ValueData::TimestampNanosecondValue(
363                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
364                ))),
365            }
366        }
367    }
368}
369
370/// `need_calc_ts` happens in two cases:
371/// 1. full greptime_identity
372/// 2. auto-transform without transformer
373///
374/// if transform is present in custom pipeline in v2 mode
375/// we dont need to calc ts again, nor do we need to check ts column name
376pub(crate) fn values_to_row(
377    schema_info: &mut SchemaInfo,
378    values: VrlValue,
379    pipeline_ctx: &PipelineContext<'_>,
380    row: Option<Vec<GreptimeValue>>,
381    need_calc_ts: bool,
382) -> Result<Row> {
383    let mut row: Vec<GreptimeValue> =
384        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
385    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
386
387    if need_calc_ts {
388        // calculate timestamp value based on the channel
389        let ts = calc_ts(pipeline_ctx, &values)?;
390        row.push(GreptimeValue { value_data: ts });
391    }
392
393    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
394
395    // skip ts column
396    let ts_column_name = custom_ts
397        .as_ref()
398        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
399
400    let values = values.into_object().context(ValueMustBeMapSnafu)?;
401
402    for (column_name, value) in values {
403        if need_calc_ts && column_name.as_str() == ts_column_name {
404            continue;
405        }
406
407        resolve_value(
408            value,
409            column_name.into(),
410            &mut row,
411            schema_info,
412            pipeline_ctx,
413        )?;
414    }
415    Ok(Row { values: row })
416}
417
418fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
419    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
420        SemanticType::Tag as i32
421    } else {
422        SemanticType::Field as i32
423    }
424}
425
426fn resolve_value(
427    value: VrlValue,
428    column_name: String,
429    row: &mut Vec<GreptimeValue>,
430    schema_info: &mut SchemaInfo,
431    p_ctx: &PipelineContext,
432) -> Result<()> {
433    let index = schema_info.index.get(&column_name).copied();
434    let mut resolve_simple_type =
435        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
436            let semantic_type = decide_semantic(p_ctx, &column_name);
437            resolve_schema(
438                index,
439                value_data,
440                ColumnSchema {
441                    column_name,
442                    datatype: data_type as i32,
443                    semantic_type,
444                    datatype_extension: None,
445                    options: None,
446                },
447                row,
448                schema_info,
449            )
450        };
451
452    match value {
453        VrlValue::Null => {}
454
455        VrlValue::Integer(v) => {
456            // safe unwrap after type matched
457            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
458        }
459
460        VrlValue::Float(v) => {
461            // safe unwrap after type matched
462            resolve_simple_type(
463                ValueData::F64Value(v.into()),
464                column_name,
465                ColumnDataType::Float64,
466            )?;
467        }
468
469        VrlValue::Boolean(v) => {
470            resolve_simple_type(
471                ValueData::BoolValue(v),
472                column_name,
473                ColumnDataType::Boolean,
474            )?;
475        }
476
477        VrlValue::Bytes(v) => {
478            resolve_simple_type(
479                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
480                column_name,
481                ColumnDataType::String,
482            )?;
483        }
484
485        VrlValue::Regex(v) => {
486            warn!(
487                "Persisting regex value in the table, this should not happen, column_name: {}",
488                column_name
489            );
490            resolve_simple_type(
491                ValueData::StringValue(v.to_string()),
492                column_name,
493                ColumnDataType::String,
494            )?;
495        }
496
497        VrlValue::Timestamp(ts) => {
498            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
499                input: ts.to_rfc3339(),
500            })?;
501            resolve_simple_type(
502                ValueData::TimestampNanosecondValue(ns),
503                column_name,
504                ColumnDataType::TimestampNanosecond,
505            )?;
506        }
507
508        VrlValue::Array(_) | VrlValue::Object(_) => {
509            let data = vrl_value_to_jsonb_value(&value);
510            resolve_schema(
511                index,
512                ValueData::BinaryValue(data.to_vec()),
513                ColumnSchema {
514                    column_name,
515                    datatype: ColumnDataType::Binary as i32,
516                    semantic_type: SemanticType::Field as i32,
517                    datatype_extension: Some(ColumnDataTypeExtension {
518                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
519                    }),
520                    options: None,
521                },
522                row,
523                schema_info,
524            )?;
525        }
526    }
527    Ok(())
528}
529
530fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
531    match value {
532        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
533        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
534        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
535        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
536        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
537        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
538        VrlValue::Object(btree_map) => jsonb::Value::Object(
539            btree_map
540                .iter()
541                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
542                .collect(),
543        ),
544        VrlValue::Array(values) => jsonb::Value::Array(
545            values
546                .iter()
547                .map(|value| vrl_value_to_jsonb_value(value))
548                .collect(),
549        ),
550        VrlValue::Null => jsonb::Value::Null,
551    }
552}
553
554fn identity_pipeline_inner(
555    pipeline_maps: Vec<VrlValue>,
556    pipeline_ctx: &PipelineContext<'_>,
557) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
558    let skip_error = pipeline_ctx.pipeline_param.skip_error();
559    let mut schema_info = SchemaInfo::default();
560    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
561
562    // set time index column schema first
563    schema_info.schema.push(ColumnSchema {
564        column_name: custom_ts
565            .map(|ts| ts.get_column_name().to_string())
566            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
567        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
568            if pipeline_ctx.channel == Channel::Prometheus {
569                ColumnDataType::TimestampMillisecond
570            } else {
571                ColumnDataType::TimestampNanosecond
572            }
573        }) as i32,
574        semantic_type: SemanticType::Timestamp as i32,
575        datatype_extension: None,
576        options: None,
577    });
578
579    let mut opt_map = HashMap::new();
580    let len = pipeline_maps.len();
581
582    for mut pipeline_map in pipeline_maps {
583        let opt = unwrap_or_continue_if_err!(
584            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
585            skip_error
586        );
587        let row = unwrap_or_continue_if_err!(
588            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
589            skip_error
590        );
591
592        opt_map
593            .entry(opt)
594            .or_insert_with(|| Vec::with_capacity(len))
595            .push(row);
596    }
597
598    let column_count = schema_info.schema.len();
599    for (_, row) in opt_map.iter_mut() {
600        for row in row.iter_mut() {
601            let diff = column_count - row.values.len();
602            for _ in 0..diff {
603                row.values.push(GreptimeValue { value_data: None });
604            }
605        }
606    }
607
608    Ok((schema_info, opt_map))
609}
610
611/// Identity pipeline for Greptime
612/// This pipeline will convert the input JSON array to Greptime Rows
613/// params table is used to set the semantic type of the row key column to Tag
614/// 1. The pipeline will add a default timestamp column to the schema
615/// 2. The pipeline not resolve NULL value
616/// 3. The pipeline assumes that the json format is fixed
617/// 4. The pipeline will return an error if the same column datatype is mismatched
618/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
619pub fn identity_pipeline(
620    array: Vec<VrlValue>,
621    table: Option<Arc<table::Table>>,
622    pipeline_ctx: &PipelineContext<'_>,
623) -> Result<HashMap<ContextOpt, Rows>> {
624    let skip_error = pipeline_ctx.pipeline_param.skip_error();
625    let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
626    // Always flatten JSON objects and stringify arrays
627    let mut input = Vec::with_capacity(array.len());
628    for item in array.into_iter() {
629        let result =
630            unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
631        input.push(result);
632    }
633
634    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
635        if let Some(table) = table {
636            let table_info = table.table_info();
637            for tag_name in table_info.meta.row_key_column_names() {
638                if let Some(index) = schema.index.get(tag_name) {
639                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
640                }
641            }
642        }
643
644        opt_map
645            .into_iter()
646            .map(|(opt, rows)| {
647                (
648                    opt,
649                    Rows {
650                        schema: schema.schema.clone(),
651                        rows,
652                    },
653                )
654            })
655            .collect::<HashMap<ContextOpt, Rows>>()
656    })
657}
658
659/// Consumes the JSON object and consumes it into a single-level object.
660///
661/// The `max_nested_levels` parameter is used to limit how deep to flatten nested JSON objects.
662/// When the maximum level is reached, the remaining nested structure is serialized to a JSON
663/// string and stored at the current flattened key.
664pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
665    let mut flattened = BTreeMap::new();
666    let object = object.into_object().context(ValueMustBeMapSnafu)?;
667
668    if !object.is_empty() {
669        // it will use recursion to flatten the object.
670        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
671    }
672
673    Ok(VrlValue::Object(flattened))
674}
675
676fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
677    match value {
678        VrlValue::Null => serde_json_crate::Value::Null,
679        VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
680        VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
681        VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
682            .map(serde_json_crate::Value::Number)
683            .unwrap_or(serde_json_crate::Value::Null),
684        VrlValue::Bytes(bytes) => {
685            serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
686        }
687        VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
688        VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
689        VrlValue::Array(arr) => {
690            serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
691        }
692        VrlValue::Object(map) => serde_json_crate::Value::Object(
693            map.iter()
694                .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
695                .collect(),
696        ),
697    }
698}
699
700fn do_flatten_object(
701    dest: &mut BTreeMap<KeyString, VrlValue>,
702    base: Option<&str>,
703    object: BTreeMap<KeyString, VrlValue>,
704    current_level: usize,
705    max_nested_levels: usize,
706) {
707    for (key, value) in object {
708        let new_key = base.map_or_else(
709            || key.clone(),
710            |base_key| format!("{base_key}.{key}").into(),
711        );
712
713        match value {
714            VrlValue::Object(object) => {
715                if current_level >= max_nested_levels {
716                    // Reached the maximum level; stringify the remaining object.
717                    let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
718                        &VrlValue::Object(object),
719                    ))
720                    .unwrap_or_else(|_| String::from("{}"));
721                    dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
722                } else {
723                    do_flatten_object(
724                        dest,
725                        Some(&new_key),
726                        object,
727                        current_level + 1,
728                        max_nested_levels,
729                    );
730                }
731            }
732            // Arrays are stringified to ensure no JSON column types in the result.
733            VrlValue::Array(_) => {
734                let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
735                    .unwrap_or_else(|_| String::from("[]"));
736                dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
737            }
738            // Other leaf types are inserted as-is.
739            _ => {
740                dest.insert(new_key, value);
741            }
742        }
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use api::v1::SemanticType;
749
750    use super::*;
751    use crate::{PipelineDefinition, identity_pipeline};
752
753    #[test]
754    fn test_identify_pipeline() {
755        let params = GreptimePipelineParams::default();
756        let pipeline_ctx = PipelineContext::new(
757            &PipelineDefinition::GreptimeIdentityPipeline(None),
758            &params,
759            Channel::Unknown,
760        );
761        {
762            let array = [
763                serde_json::json!({
764                    "woshinull": null,
765                    "name": "Alice",
766                    "age": 20,
767                    "is_student": true,
768                    "score": 99.5,
769                    "hobbies": "reading",
770                    "address": "Beijing",
771                }),
772                serde_json::json!({
773                    "name": "Bob",
774                    "age": 21,
775                    "is_student": false,
776                    "score": "88.5",
777                    "hobbies": "swimming",
778                    "address": "Shanghai",
779                    "gaga": "gaga"
780                }),
781            ];
782            let array = array.iter().map(|v| v.into()).collect();
783            let rows = identity_pipeline(array, None, &pipeline_ctx);
784            assert!(rows.is_err());
785            assert_eq!(
786                rows.err().unwrap().to_string(),
787                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
788            );
789        }
790        {
791            let array = [
792                serde_json::json!({
793                    "woshinull": null,
794                    "name": "Alice",
795                    "age": 20,
796                    "is_student": true,
797                    "score": 99.5,
798                    "hobbies": "reading",
799                    "address": "Beijing",
800                }),
801                serde_json::json!({
802                    "name": "Bob",
803                    "age": 21,
804                    "is_student": false,
805                    "score": 88,
806                    "hobbies": "swimming",
807                    "address": "Shanghai",
808                    "gaga": "gaga"
809                }),
810            ];
811            let array = array.iter().map(|v| v.into()).collect();
812            let rows = identity_pipeline(array, None, &pipeline_ctx);
813            assert!(rows.is_err());
814            assert_eq!(
815                rows.err().unwrap().to_string(),
816                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
817            );
818        }
819        {
820            let array = [
821                serde_json::json!({
822                    "woshinull": null,
823                    "name": "Alice",
824                    "age": 20,
825                    "is_student": true,
826                    "score": 99.5,
827                    "hobbies": "reading",
828                    "address": "Beijing",
829                }),
830                serde_json::json!({
831                    "name": "Bob",
832                    "age": 21,
833                    "is_student": false,
834                    "score": 88.5,
835                    "hobbies": "swimming",
836                    "address": "Shanghai",
837                    "gaga": "gaga"
838                }),
839            ];
840            let array = array.iter().map(|v| v.into()).collect();
841            let rows = identity_pipeline(array, None, &pipeline_ctx);
842            assert!(rows.is_ok());
843            let mut rows = rows.unwrap();
844            assert!(rows.len() == 1);
845            let rows = rows.remove(&ContextOpt::default()).unwrap();
846            assert_eq!(rows.schema.len(), 8);
847            assert_eq!(rows.rows.len(), 2);
848            assert_eq!(8, rows.rows[0].values.len());
849            assert_eq!(8, rows.rows[1].values.len());
850        }
851        {
852            let array = [
853                serde_json::json!({
854                    "woshinull": null,
855                    "name": "Alice",
856                    "age": 20,
857                    "is_student": true,
858                    "score": 99.5,
859                    "hobbies": "reading",
860                    "address": "Beijing",
861                }),
862                serde_json::json!({
863                    "name": "Bob",
864                    "age": 21,
865                    "is_student": false,
866                    "score": 88.5,
867                    "hobbies": "swimming",
868                    "address": "Shanghai",
869                    "gaga": "gaga"
870                }),
871            ];
872            let tag_column_names = ["name".to_string(), "address".to_string()];
873
874            let rows =
875                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
876                    .map(|(mut schema, mut rows)| {
877                        for name in tag_column_names {
878                            if let Some(index) = schema.index.get(&name) {
879                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
880                            }
881                        }
882
883                        assert!(rows.len() == 1);
884                        let rows = rows.remove(&ContextOpt::default()).unwrap();
885
886                        Rows {
887                            schema: schema.schema,
888                            rows,
889                        }
890                    });
891
892            assert!(rows.is_ok());
893            let rows = rows.unwrap();
894            assert_eq!(rows.schema.len(), 8);
895            assert_eq!(rows.rows.len(), 2);
896            assert_eq!(8, rows.rows[0].values.len());
897            assert_eq!(8, rows.rows[1].values.len());
898            assert_eq!(
899                rows.schema
900                    .iter()
901                    .find(|x| x.column_name == "name")
902                    .unwrap()
903                    .semantic_type,
904                SemanticType::Tag as i32
905            );
906            assert_eq!(
907                rows.schema
908                    .iter()
909                    .find(|x| x.column_name == "address")
910                    .unwrap()
911                    .semantic_type,
912                SemanticType::Tag as i32
913            );
914            assert_eq!(
915                rows.schema
916                    .iter()
917                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
918                    .count(),
919                2
920            );
921        }
922    }
923
924    #[test]
925    fn test_flatten() {
926        let test_cases = vec![
927            // Basic case.
928            (
929                serde_json::json!(
930                    {
931                        "a": {
932                            "b": {
933                                "c": [1, 2, 3]
934                            }
935                        },
936                        "d": [
937                            "foo",
938                            "bar"
939                        ],
940                        "e": {
941                            "f": [7, 8, 9],
942                            "g": {
943                                "h": 123,
944                                "i": "hello",
945                                "j": {
946                                    "k": true
947                                }
948                            }
949                        }
950                    }
951                ),
952                10,
953                Some(serde_json::json!(
954                    {
955                        "a.b.c": "[1,2,3]",
956                        "d": "[\"foo\",\"bar\"]",
957                        "e.f": "[7,8,9]",
958                        "e.g.h": 123,
959                        "e.g.i": "hello",
960                        "e.g.j.k": true
961                    }
962                )),
963            ),
964            // Test the case where the object has more than 3 nested levels.
965            (
966                serde_json::json!(
967                    {
968                        "a": {
969                            "b": {
970                                "c": {
971                                    "d": [1, 2, 3]
972                                }
973                            }
974                        },
975                        "e": [
976                            "foo",
977                            "bar"
978                        ]
979                    }
980                ),
981                3,
982                Some(serde_json::json!(
983                    {
984                        "a.b.c": "{\"d\":[1,2,3]}",
985                        "e": "[\"foo\",\"bar\"]"
986                    }
987                )),
988            ),
989        ];
990
991        for (input, max_depth, expected) in test_cases {
992            let input = input.into();
993            let expected = expected.map(|e| e.into());
994
995            let flattened_object = flatten_object(input, max_depth).ok();
996            assert_eq!(flattened_object, expected);
997        }
998    }
999}