pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40    Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::ctx_req::ContextOpt;
44use crate::etl::field::{Field, Fields};
45use crate::etl::transform::index::Index;
46use crate::etl::transform::{Transform, Transforms};
47use crate::etl::PipelineDocVersion;
48use crate::{truthy, unwrap_or_continue_if_err, PipelineContext};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53/// fields not in the columns will be discarded
54/// to prevent automatic column creation in GreptimeDB
55#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57    transforms: Transforms,
58    schema: Vec<ColumnSchema>,
59}
60
61/// Parameters that can be used to configure the greptime pipelines.
62#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64    /// The original options for configuring the greptime pipelines.
65    /// This should not be used directly, instead, use the parsed shortcut option values.
66    options: HashMap<String, String>,
67
68    /// Parsed shortcut option values
69    pub flatten_json_object: OnceCell<bool>,
70    /// Whether to skip error when processing the pipeline.
71    pub skip_error: OnceCell<bool>,
72}
73
74impl GreptimePipelineParams {
75    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
76    /// The params is in the format of `key1=value1&key2=value2`,for example:
77    /// x-greptime-pipeline-params: flatten_json_object=true
78    pub fn from_params(params: Option<&str>) -> Self {
79        let options = Self::parse_header_str_to_map(params);
80
81        Self {
82            options,
83            skip_error: OnceCell::new(),
84            flatten_json_object: OnceCell::new(),
85        }
86    }
87
88    pub fn from_map(options: HashMap<String, String>) -> Self {
89        Self {
90            options,
91            skip_error: OnceCell::new(),
92            flatten_json_object: OnceCell::new(),
93        }
94    }
95
96    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
97        if let Some(params) = params {
98            if params.is_empty() {
99                HashMap::new()
100            } else {
101                params
102                    .split('&')
103                    .filter_map(|s| s.split_once('='))
104                    .map(|(k, v)| (k.to_string(), v.to_string()))
105                    .collect::<HashMap<String, String>>()
106            }
107        } else {
108            HashMap::new()
109        }
110    }
111
112    /// Whether to flatten the JSON object.
113    pub fn flatten_json_object(&self) -> bool {
114        *self.flatten_json_object.get_or_init(|| {
115            self.options
116                .get("flatten_json_object")
117                .map(|v| v == "true")
118                .unwrap_or(false)
119        })
120    }
121
122    /// Whether to skip error when processing the pipeline.
123    pub fn skip_error(&self) -> bool {
124        *self
125            .skip_error
126            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
127    }
128}
129
130impl GreptimeTransformer {
131    /// Add a default timestamp column to the transforms
132    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
133        let type_ = ColumnDataType::TimestampNanosecond;
134        let default = None;
135
136        let transform = Transform {
137            fields: Fields::one(Field::new(
138                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
139                None,
140            )),
141            type_,
142            default,
143            index: Some(Index::Time),
144            on_failure: Some(crate::etl::transform::OnFailure::Default),
145            tag: false,
146        };
147        transforms.push(transform);
148    }
149
150    /// Generate the schema for the GreptimeTransformer
151    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152        let mut schema = vec![];
153        for transform in transforms.iter() {
154            schema.extend(coerce_columns(transform)?);
155        }
156        Ok(schema)
157    }
158}
159
160impl GreptimeTransformer {
161    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162        // empty check is done in the caller
163        let mut column_names_set = HashSet::new();
164        let mut timestamp_columns = vec![];
165
166        for transform in transforms.iter() {
167            let target_fields_set = transform
168                .fields
169                .iter()
170                .map(|f| f.target_or_input_field())
171                .collect::<HashSet<_>>();
172
173            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174            if !intersections.is_empty() {
175                let duplicates = intersections.iter().join(",");
176                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177            }
178
179            column_names_set.extend(target_fields_set);
180
181            if let Some(idx) = transform.index {
182                if idx == Index::Time {
183                    match transform.fields.len() {
184                        //Safety unwrap is fine here because we have checked the length of real_fields
185                        1 => {
186                            timestamp_columns.push(transform.fields.first().unwrap().input_field())
187                        }
188                        _ => {
189                            return TransformMultipleTimestampIndexSnafu {
190                                columns: transform
191                                    .fields
192                                    .iter()
193                                    .map(|x| x.input_field())
194                                    .join(", "),
195                            }
196                            .fail();
197                        }
198                    }
199                }
200            }
201        }
202
203        let schema = match timestamp_columns.len() {
204            0 if doc_version == &PipelineDocVersion::V1 => {
205                // compatible with v1, add a default timestamp column
206                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
207                GreptimeTransformer::init_schemas(&transforms)?
208            }
209            1 => GreptimeTransformer::init_schemas(&transforms)?,
210            count => {
211                let columns = timestamp_columns.iter().join(", ");
212                return TransformTimestampIndexCountSnafu { count, columns }.fail();
213            }
214        };
215        Ok(GreptimeTransformer { transforms, schema })
216    }
217
218    pub fn transform_mut(
219        &self,
220        pipeline_map: &mut VrlValue,
221        is_v1: bool,
222    ) -> Result<Vec<GreptimeValue>> {
223        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
224        let mut output_index = 0;
225        for transform in self.transforms.iter() {
226            for field in transform.fields.iter() {
227                let column_name = field.input_field();
228
229                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
230                // let keep us `get` here to be compatible with v1
231                match pipeline_map.get(column_name) {
232                    Some(v) => {
233                        let value_data = coerce_value(v, transform)?;
234                        // every transform fields has only one output field
235                        values[output_index] = GreptimeValue { value_data };
236                    }
237                    None => {
238                        let value_data = match transform.on_failure {
239                            Some(crate::etl::transform::OnFailure::Default) => {
240                                match transform.get_default() {
241                                    Some(default) => Some(default.clone()),
242                                    None => transform.get_default_value_when_data_is_none(),
243                                }
244                            }
245                            Some(crate::etl::transform::OnFailure::Ignore) => None,
246                            None => None,
247                        };
248                        if transform.is_timeindex() && value_data.is_none() {
249                            return TimeIndexMustBeNonNullSnafu.fail();
250                        }
251                        values[output_index] = GreptimeValue { value_data };
252                    }
253                }
254                output_index += 1;
255                if !is_v1 {
256                    // remove the column from the pipeline_map
257                    // so that the auto-transform can use the rest fields
258                    pipeline_map.remove(column_name);
259                }
260            }
261        }
262        Ok(values)
263    }
264
265    pub fn transforms(&self) -> &Transforms {
266        &self.transforms
267    }
268
269    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
270        &self.schema
271    }
272
273    pub fn transforms_mut(&mut self) -> &mut Transforms {
274        &mut self.transforms
275    }
276}
277
278/// This is used to record the current state schema information and a sequential cache of field names.
279/// As you traverse the user input JSON, this will change.
280/// It will record a superset of all user input schemas.
281#[derive(Debug, Default)]
282pub struct SchemaInfo {
283    /// schema info
284    pub schema: Vec<ColumnSchema>,
285    /// index of the column name
286    pub index: HashMap<String, usize>,
287}
288
289impl SchemaInfo {
290    pub fn with_capacity(capacity: usize) -> Self {
291        Self {
292            schema: Vec::with_capacity(capacity),
293            index: HashMap::with_capacity(capacity),
294        }
295    }
296
297    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
298        let mut index = HashMap::new();
299        for (i, schema) in schema_list.iter().enumerate() {
300            index.insert(schema.column_name.clone(), i);
301        }
302        Self {
303            schema: schema_list,
304            index,
305        }
306    }
307}
308
309fn resolve_schema(
310    index: Option<usize>,
311    value_data: ValueData,
312    column_schema: ColumnSchema,
313    row: &mut Vec<GreptimeValue>,
314    schema_info: &mut SchemaInfo,
315) -> Result<()> {
316    if let Some(index) = index {
317        let api_value = GreptimeValue {
318            value_data: Some(value_data),
319        };
320        // Safety unwrap is fine here because api_value is always valid
321        let value_column_data_type = proto_value_type(&api_value).unwrap();
322        // Safety unwrap is fine here because index is always valid
323        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
324        if value_column_data_type != schema_column_data_type {
325            IdentifyPipelineColumnTypeMismatchSnafu {
326                column: column_schema.column_name,
327                expected: schema_column_data_type.as_str_name(),
328                actual: value_column_data_type.as_str_name(),
329            }
330            .fail()
331        } else {
332            row[index] = api_value;
333            Ok(())
334        }
335    } else {
336        let key = column_schema.column_name.clone();
337        schema_info.schema.push(column_schema);
338        schema_info.index.insert(key, schema_info.schema.len() - 1);
339        let api_value = GreptimeValue {
340            value_data: Some(value_data),
341        };
342        row.push(api_value);
343        Ok(())
344    }
345}
346
347fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
348    match p_ctx.channel {
349        Channel::Prometheus => {
350            let ts = values
351                .as_object()
352                .and_then(|m| m.get(GREPTIME_TIMESTAMP))
353                .and_then(|ts| ts.try_into_i64().ok())
354                .unwrap_or_default();
355            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
356        }
357        _ => {
358            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
359            match custom_ts {
360                Some(ts) => {
361                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
362                    Some(ts.get_timestamp_value(ts_field)).transpose()
363                }
364                None => Ok(Some(ValueData::TimestampNanosecondValue(
365                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
366                ))),
367            }
368        }
369    }
370}
371
372/// `need_calc_ts` happens in two cases:
373/// 1. full greptime_identity
374/// 2. auto-transform without transformer
375///
376/// if transform is present in custom pipeline in v2 mode
377/// we dont need to calc ts again, nor do we need to check ts column name
378pub(crate) fn values_to_row(
379    schema_info: &mut SchemaInfo,
380    values: VrlValue,
381    pipeline_ctx: &PipelineContext<'_>,
382    row: Option<Vec<GreptimeValue>>,
383    need_calc_ts: bool,
384) -> Result<Row> {
385    let mut row: Vec<GreptimeValue> =
386        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
387    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
388
389    if need_calc_ts {
390        // calculate timestamp value based on the channel
391        let ts = calc_ts(pipeline_ctx, &values)?;
392        row.push(GreptimeValue { value_data: ts });
393    }
394
395    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
396
397    // skip ts column
398    let ts_column_name = custom_ts
399        .as_ref()
400        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
401
402    let values = values.into_object().context(ValueMustBeMapSnafu)?;
403
404    for (column_name, value) in values {
405        if need_calc_ts && column_name.as_str() == ts_column_name {
406            continue;
407        }
408
409        resolve_value(
410            value,
411            column_name.into(),
412            &mut row,
413            schema_info,
414            pipeline_ctx,
415        )?;
416    }
417    Ok(Row { values: row })
418}
419
420fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
421    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
422        SemanticType::Tag as i32
423    } else {
424        SemanticType::Field as i32
425    }
426}
427
428fn resolve_value(
429    value: VrlValue,
430    column_name: String,
431    row: &mut Vec<GreptimeValue>,
432    schema_info: &mut SchemaInfo,
433    p_ctx: &PipelineContext,
434) -> Result<()> {
435    let index = schema_info.index.get(&column_name).copied();
436    let mut resolve_simple_type =
437        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
438            let semantic_type = decide_semantic(p_ctx, &column_name);
439            resolve_schema(
440                index,
441                value_data,
442                ColumnSchema {
443                    column_name,
444                    datatype: data_type as i32,
445                    semantic_type,
446                    datatype_extension: None,
447                    options: None,
448                },
449                row,
450                schema_info,
451            )
452        };
453
454    match value {
455        VrlValue::Null => {}
456
457        VrlValue::Integer(v) => {
458            // safe unwrap after type matched
459            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
460        }
461
462        VrlValue::Float(v) => {
463            // safe unwrap after type matched
464            resolve_simple_type(
465                ValueData::F64Value(v.into()),
466                column_name,
467                ColumnDataType::Float64,
468            )?;
469        }
470
471        VrlValue::Boolean(v) => {
472            resolve_simple_type(
473                ValueData::BoolValue(v),
474                column_name,
475                ColumnDataType::Boolean,
476            )?;
477        }
478
479        VrlValue::Bytes(v) => {
480            resolve_simple_type(
481                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
482                column_name,
483                ColumnDataType::String,
484            )?;
485        }
486
487        VrlValue::Regex(v) => {
488            warn!(
489                "Persisting regex value in the table, this should not happen, column_name: {}",
490                column_name
491            );
492            resolve_simple_type(
493                ValueData::StringValue(v.to_string()),
494                column_name,
495                ColumnDataType::String,
496            )?;
497        }
498
499        VrlValue::Timestamp(ts) => {
500            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
501                input: ts.to_rfc3339(),
502            })?;
503            resolve_simple_type(
504                ValueData::TimestampNanosecondValue(ns),
505                column_name,
506                ColumnDataType::TimestampNanosecond,
507            )?;
508        }
509
510        VrlValue::Array(_) | VrlValue::Object(_) => {
511            let data = vrl_value_to_jsonb_value(&value);
512            resolve_schema(
513                index,
514                ValueData::BinaryValue(data.to_vec()),
515                ColumnSchema {
516                    column_name,
517                    datatype: ColumnDataType::Binary as i32,
518                    semantic_type: SemanticType::Field as i32,
519                    datatype_extension: Some(ColumnDataTypeExtension {
520                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
521                    }),
522                    options: None,
523                },
524                row,
525                schema_info,
526            )?;
527        }
528    }
529    Ok(())
530}
531
532fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
533    match value {
534        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
535        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
536        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
537        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
538        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
539        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
540        VrlValue::Object(btree_map) => jsonb::Value::Object(
541            btree_map
542                .iter()
543                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
544                .collect(),
545        ),
546        VrlValue::Array(values) => jsonb::Value::Array(
547            values
548                .iter()
549                .map(|value| vrl_value_to_jsonb_value(value))
550                .collect(),
551        ),
552        VrlValue::Null => jsonb::Value::Null,
553    }
554}
555
556fn identity_pipeline_inner(
557    pipeline_maps: Vec<VrlValue>,
558    pipeline_ctx: &PipelineContext<'_>,
559) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
560    let skip_error = pipeline_ctx.pipeline_param.skip_error();
561    let mut schema_info = SchemaInfo::default();
562    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
563
564    // set time index column schema first
565    schema_info.schema.push(ColumnSchema {
566        column_name: custom_ts
567            .map(|ts| ts.get_column_name().to_string())
568            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
569        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
570            if pipeline_ctx.channel == Channel::Prometheus {
571                ColumnDataType::TimestampMillisecond
572            } else {
573                ColumnDataType::TimestampNanosecond
574            }
575        }) as i32,
576        semantic_type: SemanticType::Timestamp as i32,
577        datatype_extension: None,
578        options: None,
579    });
580
581    let mut opt_map = HashMap::new();
582    let len = pipeline_maps.len();
583
584    for mut pipeline_map in pipeline_maps {
585        let opt = unwrap_or_continue_if_err!(
586            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
587            skip_error
588        );
589        let row = unwrap_or_continue_if_err!(
590            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
591            skip_error
592        );
593
594        opt_map
595            .entry(opt)
596            .or_insert_with(|| Vec::with_capacity(len))
597            .push(row);
598    }
599
600    let column_count = schema_info.schema.len();
601    for (_, row) in opt_map.iter_mut() {
602        for row in row.iter_mut() {
603            let diff = column_count - row.values.len();
604            for _ in 0..diff {
605                row.values.push(GreptimeValue { value_data: None });
606            }
607        }
608    }
609
610    Ok((schema_info, opt_map))
611}
612
613/// Identity pipeline for Greptime
614/// This pipeline will convert the input JSON array to Greptime Rows
615/// params table is used to set the semantic type of the row key column to Tag
616/// 1. The pipeline will add a default timestamp column to the schema
617/// 2. The pipeline not resolve NULL value
618/// 3. The pipeline assumes that the json format is fixed
619/// 4. The pipeline will return an error if the same column datatype is mismatched
620/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
621pub fn identity_pipeline(
622    array: Vec<VrlValue>,
623    table: Option<Arc<table::Table>>,
624    pipeline_ctx: &PipelineContext<'_>,
625) -> Result<HashMap<ContextOpt, Rows>> {
626    let skip_error = pipeline_ctx.pipeline_param.skip_error();
627    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
628        let mut results = Vec::with_capacity(array.len());
629        for item in array.into_iter() {
630            let result = unwrap_or_continue_if_err!(
631                flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
632                skip_error
633            );
634            results.push(result);
635        }
636        results
637    } else {
638        array
639    };
640
641    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
642        if let Some(table) = table {
643            let table_info = table.table_info();
644            for tag_name in table_info.meta.row_key_column_names() {
645                if let Some(index) = schema.index.get(tag_name) {
646                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
647                }
648            }
649        }
650
651        opt_map
652            .into_iter()
653            .map(|(opt, rows)| {
654                (
655                    opt,
656                    Rows {
657                        schema: schema.schema.clone(),
658                        rows,
659                    },
660                )
661            })
662            .collect::<HashMap<ContextOpt, Rows>>()
663    })
664}
665
666/// Consumes the JSON object and consumes it into a single-level object.
667///
668/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
669/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
670pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
671    let mut flattened = BTreeMap::new();
672    let object = object.into_object().context(ValueMustBeMapSnafu)?;
673
674    if !object.is_empty() {
675        // it will use recursion to flatten the object.
676        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
677    }
678
679    Ok(VrlValue::Object(flattened))
680}
681
682fn do_flatten_object(
683    dest: &mut BTreeMap<KeyString, VrlValue>,
684    base: Option<&str>,
685    object: BTreeMap<KeyString, VrlValue>,
686    current_level: usize,
687    max_nested_levels: usize,
688) -> Result<()> {
689    // For safety, we do not allow the depth to be greater than the max_object_depth.
690    if current_level > max_nested_levels {
691        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
692    }
693
694    for (key, value) in object {
695        let new_key = base.map_or_else(
696            || key.clone(),
697            |base_key| format!("{base_key}.{key}").into(),
698        );
699
700        match value {
701            VrlValue::Object(object) => {
702                do_flatten_object(
703                    dest,
704                    Some(&new_key),
705                    object,
706                    current_level + 1,
707                    max_nested_levels,
708                )?;
709            }
710            // For other types, we will directly insert them into as JSON type.
711            _ => {
712                dest.insert(new_key, value);
713            }
714        }
715    }
716
717    Ok(())
718}
719
720#[cfg(test)]
721mod tests {
722    use api::v1::SemanticType;
723
724    use super::*;
725    use crate::{identity_pipeline, PipelineDefinition};
726
727    #[test]
728    fn test_identify_pipeline() {
729        let params = GreptimePipelineParams::default();
730        let pipeline_ctx = PipelineContext::new(
731            &PipelineDefinition::GreptimeIdentityPipeline(None),
732            &params,
733            Channel::Unknown,
734        );
735        {
736            let array = [
737                serde_json::json!({
738                    "woshinull": null,
739                    "name": "Alice",
740                    "age": 20,
741                    "is_student": true,
742                    "score": 99.5,
743                    "hobbies": "reading",
744                    "address": "Beijing",
745                }),
746                serde_json::json!({
747                    "name": "Bob",
748                    "age": 21,
749                    "is_student": false,
750                    "score": "88.5",
751                    "hobbies": "swimming",
752                    "address": "Shanghai",
753                    "gaga": "gaga"
754                }),
755            ];
756            let array = array.iter().map(|v| v.into()).collect();
757            let rows = identity_pipeline(array, None, &pipeline_ctx);
758            assert!(rows.is_err());
759            assert_eq!(
760                rows.err().unwrap().to_string(),
761                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
762            );
763        }
764        {
765            let array = [
766                serde_json::json!({
767                    "woshinull": null,
768                    "name": "Alice",
769                    "age": 20,
770                    "is_student": true,
771                    "score": 99.5,
772                    "hobbies": "reading",
773                    "address": "Beijing",
774                }),
775                serde_json::json!({
776                    "name": "Bob",
777                    "age": 21,
778                    "is_student": false,
779                    "score": 88,
780                    "hobbies": "swimming",
781                    "address": "Shanghai",
782                    "gaga": "gaga"
783                }),
784            ];
785            let array = array.iter().map(|v| v.into()).collect();
786            let rows = identity_pipeline(array, None, &pipeline_ctx);
787            assert!(rows.is_err());
788            assert_eq!(
789                rows.err().unwrap().to_string(),
790                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
791            );
792        }
793        {
794            let array = [
795                serde_json::json!({
796                    "woshinull": null,
797                    "name": "Alice",
798                    "age": 20,
799                    "is_student": true,
800                    "score": 99.5,
801                    "hobbies": "reading",
802                    "address": "Beijing",
803                }),
804                serde_json::json!({
805                    "name": "Bob",
806                    "age": 21,
807                    "is_student": false,
808                    "score": 88.5,
809                    "hobbies": "swimming",
810                    "address": "Shanghai",
811                    "gaga": "gaga"
812                }),
813            ];
814            let array = array.iter().map(|v| v.into()).collect();
815            let rows = identity_pipeline(array, None, &pipeline_ctx);
816            assert!(rows.is_ok());
817            let mut rows = rows.unwrap();
818            assert!(rows.len() == 1);
819            let rows = rows.remove(&ContextOpt::default()).unwrap();
820            assert_eq!(rows.schema.len(), 8);
821            assert_eq!(rows.rows.len(), 2);
822            assert_eq!(8, rows.rows[0].values.len());
823            assert_eq!(8, rows.rows[1].values.len());
824        }
825        {
826            let array = [
827                serde_json::json!({
828                    "woshinull": null,
829                    "name": "Alice",
830                    "age": 20,
831                    "is_student": true,
832                    "score": 99.5,
833                    "hobbies": "reading",
834                    "address": "Beijing",
835                }),
836                serde_json::json!({
837                    "name": "Bob",
838                    "age": 21,
839                    "is_student": false,
840                    "score": 88.5,
841                    "hobbies": "swimming",
842                    "address": "Shanghai",
843                    "gaga": "gaga"
844                }),
845            ];
846            let tag_column_names = ["name".to_string(), "address".to_string()];
847
848            let rows =
849                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
850                    .map(|(mut schema, mut rows)| {
851                        for name in tag_column_names {
852                            if let Some(index) = schema.index.get(&name) {
853                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
854                            }
855                        }
856
857                        assert!(rows.len() == 1);
858                        let rows = rows.remove(&ContextOpt::default()).unwrap();
859
860                        Rows {
861                            schema: schema.schema,
862                            rows,
863                        }
864                    });
865
866            assert!(rows.is_ok());
867            let rows = rows.unwrap();
868            assert_eq!(rows.schema.len(), 8);
869            assert_eq!(rows.rows.len(), 2);
870            assert_eq!(8, rows.rows[0].values.len());
871            assert_eq!(8, rows.rows[1].values.len());
872            assert_eq!(
873                rows.schema
874                    .iter()
875                    .find(|x| x.column_name == "name")
876                    .unwrap()
877                    .semantic_type,
878                SemanticType::Tag as i32
879            );
880            assert_eq!(
881                rows.schema
882                    .iter()
883                    .find(|x| x.column_name == "address")
884                    .unwrap()
885                    .semantic_type,
886                SemanticType::Tag as i32
887            );
888            assert_eq!(
889                rows.schema
890                    .iter()
891                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
892                    .count(),
893                2
894            );
895        }
896    }
897
898    #[test]
899    fn test_flatten() {
900        let test_cases = vec![
901            // Basic case.
902            (
903                serde_json::json!(
904                    {
905                        "a": {
906                            "b": {
907                                "c": [1, 2, 3]
908                            }
909                        },
910                        "d": [
911                            "foo",
912                            "bar"
913                        ],
914                        "e": {
915                            "f": [7, 8, 9],
916                            "g": {
917                                "h": 123,
918                                "i": "hello",
919                                "j": {
920                                    "k": true
921                                }
922                            }
923                        }
924                    }
925                ),
926                10,
927                Some(serde_json::json!(
928                    {
929                        "a.b.c": [1,2,3],
930                        "d": ["foo","bar"],
931                        "e.f": [7,8,9],
932                        "e.g.h": 123,
933                        "e.g.i": "hello",
934                        "e.g.j.k": true
935                    }
936                )),
937            ),
938            // Test the case where the object has more than 3 nested levels.
939            (
940                serde_json::json!(
941                    {
942                        "a": {
943                            "b": {
944                                "c": {
945                                    "d": [1, 2, 3]
946                                }
947                            }
948                        },
949                        "e": [
950                            "foo",
951                            "bar"
952                        ]
953                    }
954                ),
955                3,
956                None,
957            ),
958        ];
959
960        for (input, max_depth, expected) in test_cases {
961            let input = input.into();
962            let expected = expected.map(|e| e.into());
963
964            let flattened_object = flatten_object(input, max_depth).ok();
965            assert_eq!(flattened_object, expected);
966        }
967    }
968
969    #[test]
970    fn test_greptime_pipeline_params() {
971        let params = Some("flatten_json_object=true");
972        let pipeline_params = GreptimePipelineParams::from_params(params);
973        assert!(pipeline_params.flatten_json_object());
974
975        let params = None;
976        let pipeline_params = GreptimePipelineParams::from_params(params);
977        assert!(!pipeline_params.flatten_json_object());
978    }
979}