pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40    Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::PipelineDocVersion;
44use crate::etl::ctx_req::ContextOpt;
45use crate::etl::field::{Field, Fields};
46use crate::etl::transform::index::Index;
47use crate::etl::transform::{Transform, Transforms};
48use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53/// fields not in the columns will be discarded
54/// to prevent automatic column creation in GreptimeDB
55#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57    transforms: Transforms,
58    schema: Vec<ColumnSchema>,
59}
60
61/// Parameters that can be used to configure the greptime pipelines.
62#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64    /// The original options for configuring the greptime pipelines.
65    /// This should not be used directly, instead, use the parsed shortcut option values.
66    options: HashMap<String, String>,
67
68    /// Parsed shortcut option values
69    pub flatten_json_object: OnceCell<bool>,
70    /// Whether to skip error when processing the pipeline.
71    pub skip_error: OnceCell<bool>,
72}
73
74impl GreptimePipelineParams {
75    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
76    /// The params is in the format of `key1=value1&key2=value2`,for example:
77    /// x-greptime-pipeline-params: flatten_json_object=true
78    pub fn from_params(params: Option<&str>) -> Self {
79        let options = Self::parse_header_str_to_map(params);
80
81        Self {
82            options,
83            skip_error: OnceCell::new(),
84            flatten_json_object: OnceCell::new(),
85        }
86    }
87
88    pub fn from_map(options: HashMap<String, String>) -> Self {
89        Self {
90            options,
91            skip_error: OnceCell::new(),
92            flatten_json_object: OnceCell::new(),
93        }
94    }
95
96    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
97        if let Some(params) = params {
98            if params.is_empty() {
99                HashMap::new()
100            } else {
101                params
102                    .split('&')
103                    .filter_map(|s| s.split_once('='))
104                    .map(|(k, v)| (k.to_string(), v.to_string()))
105                    .collect::<HashMap<String, String>>()
106            }
107        } else {
108            HashMap::new()
109        }
110    }
111
112    /// Whether to flatten the JSON object.
113    pub fn flatten_json_object(&self) -> bool {
114        *self.flatten_json_object.get_or_init(|| {
115            self.options
116                .get("flatten_json_object")
117                .map(|v| v == "true")
118                .unwrap_or(false)
119        })
120    }
121
122    /// Whether to skip error when processing the pipeline.
123    pub fn skip_error(&self) -> bool {
124        *self
125            .skip_error
126            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
127    }
128}
129
130impl GreptimeTransformer {
131    /// Add a default timestamp column to the transforms
132    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
133        let type_ = ColumnDataType::TimestampNanosecond;
134        let default = None;
135
136        let transform = Transform {
137            fields: Fields::one(Field::new(
138                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
139                None,
140            )),
141            type_,
142            default,
143            index: Some(Index::Time),
144            on_failure: Some(crate::etl::transform::OnFailure::Default),
145            tag: false,
146        };
147        transforms.push(transform);
148    }
149
150    /// Generate the schema for the GreptimeTransformer
151    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152        let mut schema = vec![];
153        for transform in transforms.iter() {
154            schema.extend(coerce_columns(transform)?);
155        }
156        Ok(schema)
157    }
158}
159
160impl GreptimeTransformer {
161    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162        // empty check is done in the caller
163        let mut column_names_set = HashSet::new();
164        let mut timestamp_columns = vec![];
165
166        for transform in transforms.iter() {
167            let target_fields_set = transform
168                .fields
169                .iter()
170                .map(|f| f.target_or_input_field())
171                .collect::<HashSet<_>>();
172
173            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174            if !intersections.is_empty() {
175                let duplicates = intersections.iter().join(",");
176                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177            }
178
179            column_names_set.extend(target_fields_set);
180
181            if let Some(idx) = transform.index
182                && idx == Index::Time
183            {
184                match transform.fields.len() {
185                    //Safety unwrap is fine here because we have checked the length of real_fields
186                    1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
187                    _ => {
188                        return TransformMultipleTimestampIndexSnafu {
189                            columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
190                        }
191                        .fail();
192                    }
193                }
194            }
195        }
196
197        let schema = match timestamp_columns.len() {
198            0 if doc_version == &PipelineDocVersion::V1 => {
199                // compatible with v1, add a default timestamp column
200                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
201                GreptimeTransformer::init_schemas(&transforms)?
202            }
203            1 => GreptimeTransformer::init_schemas(&transforms)?,
204            count => {
205                let columns = timestamp_columns.iter().join(", ");
206                return TransformTimestampIndexCountSnafu { count, columns }.fail();
207            }
208        };
209        Ok(GreptimeTransformer { transforms, schema })
210    }
211
212    pub fn transform_mut(
213        &self,
214        pipeline_map: &mut VrlValue,
215        is_v1: bool,
216    ) -> Result<Vec<GreptimeValue>> {
217        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
218        let mut output_index = 0;
219        for transform in self.transforms.iter() {
220            for field in transform.fields.iter() {
221                let column_name = field.input_field();
222
223                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
224                // let keep us `get` here to be compatible with v1
225                match pipeline_map.get(column_name) {
226                    Some(v) => {
227                        let value_data = coerce_value(v, transform)?;
228                        // every transform fields has only one output field
229                        values[output_index] = GreptimeValue { value_data };
230                    }
231                    None => {
232                        let value_data = match transform.on_failure {
233                            Some(crate::etl::transform::OnFailure::Default) => {
234                                match transform.get_default() {
235                                    Some(default) => Some(default.clone()),
236                                    None => transform.get_default_value_when_data_is_none(),
237                                }
238                            }
239                            Some(crate::etl::transform::OnFailure::Ignore) => None,
240                            None => None,
241                        };
242                        if transform.is_timeindex() && value_data.is_none() {
243                            return TimeIndexMustBeNonNullSnafu.fail();
244                        }
245                        values[output_index] = GreptimeValue { value_data };
246                    }
247                }
248                output_index += 1;
249                if !is_v1 {
250                    // remove the column from the pipeline_map
251                    // so that the auto-transform can use the rest fields
252                    pipeline_map.remove(column_name);
253                }
254            }
255        }
256        Ok(values)
257    }
258
259    pub fn transforms(&self) -> &Transforms {
260        &self.transforms
261    }
262
263    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
264        &self.schema
265    }
266
267    pub fn transforms_mut(&mut self) -> &mut Transforms {
268        &mut self.transforms
269    }
270}
271
272/// This is used to record the current state schema information and a sequential cache of field names.
273/// As you traverse the user input JSON, this will change.
274/// It will record a superset of all user input schemas.
275#[derive(Debug, Default)]
276pub struct SchemaInfo {
277    /// schema info
278    pub schema: Vec<ColumnSchema>,
279    /// index of the column name
280    pub index: HashMap<String, usize>,
281}
282
283impl SchemaInfo {
284    pub fn with_capacity(capacity: usize) -> Self {
285        Self {
286            schema: Vec::with_capacity(capacity),
287            index: HashMap::with_capacity(capacity),
288        }
289    }
290
291    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
292        let mut index = HashMap::new();
293        for (i, schema) in schema_list.iter().enumerate() {
294            index.insert(schema.column_name.clone(), i);
295        }
296        Self {
297            schema: schema_list,
298            index,
299        }
300    }
301}
302
303fn resolve_schema(
304    index: Option<usize>,
305    value_data: ValueData,
306    column_schema: ColumnSchema,
307    row: &mut Vec<GreptimeValue>,
308    schema_info: &mut SchemaInfo,
309) -> Result<()> {
310    if let Some(index) = index {
311        let api_value = GreptimeValue {
312            value_data: Some(value_data),
313        };
314        // Safety unwrap is fine here because api_value is always valid
315        let value_column_data_type = proto_value_type(&api_value).unwrap();
316        // Safety unwrap is fine here because index is always valid
317        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
318        if value_column_data_type != schema_column_data_type {
319            IdentifyPipelineColumnTypeMismatchSnafu {
320                column: column_schema.column_name,
321                expected: schema_column_data_type.as_str_name(),
322                actual: value_column_data_type.as_str_name(),
323            }
324            .fail()
325        } else {
326            row[index] = api_value;
327            Ok(())
328        }
329    } else {
330        let key = column_schema.column_name.clone();
331        schema_info.schema.push(column_schema);
332        schema_info.index.insert(key, schema_info.schema.len() - 1);
333        let api_value = GreptimeValue {
334            value_data: Some(value_data),
335        };
336        row.push(api_value);
337        Ok(())
338    }
339}
340
341fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
342    match p_ctx.channel {
343        Channel::Prometheus => {
344            let ts = values
345                .as_object()
346                .and_then(|m| m.get(GREPTIME_TIMESTAMP))
347                .and_then(|ts| ts.try_into_i64().ok())
348                .unwrap_or_default();
349            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
350        }
351        _ => {
352            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
353            match custom_ts {
354                Some(ts) => {
355                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
356                    Some(ts.get_timestamp_value(ts_field)).transpose()
357                }
358                None => Ok(Some(ValueData::TimestampNanosecondValue(
359                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
360                ))),
361            }
362        }
363    }
364}
365
366/// `need_calc_ts` happens in two cases:
367/// 1. full greptime_identity
368/// 2. auto-transform without transformer
369///
370/// if transform is present in custom pipeline in v2 mode
371/// we dont need to calc ts again, nor do we need to check ts column name
372pub(crate) fn values_to_row(
373    schema_info: &mut SchemaInfo,
374    values: VrlValue,
375    pipeline_ctx: &PipelineContext<'_>,
376    row: Option<Vec<GreptimeValue>>,
377    need_calc_ts: bool,
378) -> Result<Row> {
379    let mut row: Vec<GreptimeValue> =
380        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
381    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
382
383    if need_calc_ts {
384        // calculate timestamp value based on the channel
385        let ts = calc_ts(pipeline_ctx, &values)?;
386        row.push(GreptimeValue { value_data: ts });
387    }
388
389    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
390
391    // skip ts column
392    let ts_column_name = custom_ts
393        .as_ref()
394        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
395
396    let values = values.into_object().context(ValueMustBeMapSnafu)?;
397
398    for (column_name, value) in values {
399        if need_calc_ts && column_name.as_str() == ts_column_name {
400            continue;
401        }
402
403        resolve_value(
404            value,
405            column_name.into(),
406            &mut row,
407            schema_info,
408            pipeline_ctx,
409        )?;
410    }
411    Ok(Row { values: row })
412}
413
414fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
415    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
416        SemanticType::Tag as i32
417    } else {
418        SemanticType::Field as i32
419    }
420}
421
422fn resolve_value(
423    value: VrlValue,
424    column_name: String,
425    row: &mut Vec<GreptimeValue>,
426    schema_info: &mut SchemaInfo,
427    p_ctx: &PipelineContext,
428) -> Result<()> {
429    let index = schema_info.index.get(&column_name).copied();
430    let mut resolve_simple_type =
431        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
432            let semantic_type = decide_semantic(p_ctx, &column_name);
433            resolve_schema(
434                index,
435                value_data,
436                ColumnSchema {
437                    column_name,
438                    datatype: data_type as i32,
439                    semantic_type,
440                    datatype_extension: None,
441                    options: None,
442                },
443                row,
444                schema_info,
445            )
446        };
447
448    match value {
449        VrlValue::Null => {}
450
451        VrlValue::Integer(v) => {
452            // safe unwrap after type matched
453            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
454        }
455
456        VrlValue::Float(v) => {
457            // safe unwrap after type matched
458            resolve_simple_type(
459                ValueData::F64Value(v.into()),
460                column_name,
461                ColumnDataType::Float64,
462            )?;
463        }
464
465        VrlValue::Boolean(v) => {
466            resolve_simple_type(
467                ValueData::BoolValue(v),
468                column_name,
469                ColumnDataType::Boolean,
470            )?;
471        }
472
473        VrlValue::Bytes(v) => {
474            resolve_simple_type(
475                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
476                column_name,
477                ColumnDataType::String,
478            )?;
479        }
480
481        VrlValue::Regex(v) => {
482            warn!(
483                "Persisting regex value in the table, this should not happen, column_name: {}",
484                column_name
485            );
486            resolve_simple_type(
487                ValueData::StringValue(v.to_string()),
488                column_name,
489                ColumnDataType::String,
490            )?;
491        }
492
493        VrlValue::Timestamp(ts) => {
494            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
495                input: ts.to_rfc3339(),
496            })?;
497            resolve_simple_type(
498                ValueData::TimestampNanosecondValue(ns),
499                column_name,
500                ColumnDataType::TimestampNanosecond,
501            )?;
502        }
503
504        VrlValue::Array(_) | VrlValue::Object(_) => {
505            let data = vrl_value_to_jsonb_value(&value);
506            resolve_schema(
507                index,
508                ValueData::BinaryValue(data.to_vec()),
509                ColumnSchema {
510                    column_name,
511                    datatype: ColumnDataType::Binary as i32,
512                    semantic_type: SemanticType::Field as i32,
513                    datatype_extension: Some(ColumnDataTypeExtension {
514                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
515                    }),
516                    options: None,
517                },
518                row,
519                schema_info,
520            )?;
521        }
522    }
523    Ok(())
524}
525
526fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
527    match value {
528        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
529        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
530        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
531        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
532        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
533        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
534        VrlValue::Object(btree_map) => jsonb::Value::Object(
535            btree_map
536                .iter()
537                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
538                .collect(),
539        ),
540        VrlValue::Array(values) => jsonb::Value::Array(
541            values
542                .iter()
543                .map(|value| vrl_value_to_jsonb_value(value))
544                .collect(),
545        ),
546        VrlValue::Null => jsonb::Value::Null,
547    }
548}
549
550fn identity_pipeline_inner(
551    pipeline_maps: Vec<VrlValue>,
552    pipeline_ctx: &PipelineContext<'_>,
553) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
554    let skip_error = pipeline_ctx.pipeline_param.skip_error();
555    let mut schema_info = SchemaInfo::default();
556    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
557
558    // set time index column schema first
559    schema_info.schema.push(ColumnSchema {
560        column_name: custom_ts
561            .map(|ts| ts.get_column_name().to_string())
562            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
563        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
564            if pipeline_ctx.channel == Channel::Prometheus {
565                ColumnDataType::TimestampMillisecond
566            } else {
567                ColumnDataType::TimestampNanosecond
568            }
569        }) as i32,
570        semantic_type: SemanticType::Timestamp as i32,
571        datatype_extension: None,
572        options: None,
573    });
574
575    let mut opt_map = HashMap::new();
576    let len = pipeline_maps.len();
577
578    for mut pipeline_map in pipeline_maps {
579        let opt = unwrap_or_continue_if_err!(
580            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
581            skip_error
582        );
583        let row = unwrap_or_continue_if_err!(
584            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
585            skip_error
586        );
587
588        opt_map
589            .entry(opt)
590            .or_insert_with(|| Vec::with_capacity(len))
591            .push(row);
592    }
593
594    let column_count = schema_info.schema.len();
595    for (_, row) in opt_map.iter_mut() {
596        for row in row.iter_mut() {
597            let diff = column_count - row.values.len();
598            for _ in 0..diff {
599                row.values.push(GreptimeValue { value_data: None });
600            }
601        }
602    }
603
604    Ok((schema_info, opt_map))
605}
606
607/// Identity pipeline for Greptime
608/// This pipeline will convert the input JSON array to Greptime Rows
609/// params table is used to set the semantic type of the row key column to Tag
610/// 1. The pipeline will add a default timestamp column to the schema
611/// 2. The pipeline not resolve NULL value
612/// 3. The pipeline assumes that the json format is fixed
613/// 4. The pipeline will return an error if the same column datatype is mismatched
614/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
615pub fn identity_pipeline(
616    array: Vec<VrlValue>,
617    table: Option<Arc<table::Table>>,
618    pipeline_ctx: &PipelineContext<'_>,
619) -> Result<HashMap<ContextOpt, Rows>> {
620    let skip_error = pipeline_ctx.pipeline_param.skip_error();
621    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
622        let mut results = Vec::with_capacity(array.len());
623        for item in array.into_iter() {
624            let result = unwrap_or_continue_if_err!(
625                flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
626                skip_error
627            );
628            results.push(result);
629        }
630        results
631    } else {
632        array
633    };
634
635    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
636        if let Some(table) = table {
637            let table_info = table.table_info();
638            for tag_name in table_info.meta.row_key_column_names() {
639                if let Some(index) = schema.index.get(tag_name) {
640                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
641                }
642            }
643        }
644
645        opt_map
646            .into_iter()
647            .map(|(opt, rows)| {
648                (
649                    opt,
650                    Rows {
651                        schema: schema.schema.clone(),
652                        rows,
653                    },
654                )
655            })
656            .collect::<HashMap<ContextOpt, Rows>>()
657    })
658}
659
660/// Consumes the JSON object and consumes it into a single-level object.
661///
662/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
663/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
664pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
665    let mut flattened = BTreeMap::new();
666    let object = object.into_object().context(ValueMustBeMapSnafu)?;
667
668    if !object.is_empty() {
669        // it will use recursion to flatten the object.
670        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
671    }
672
673    Ok(VrlValue::Object(flattened))
674}
675
676fn do_flatten_object(
677    dest: &mut BTreeMap<KeyString, VrlValue>,
678    base: Option<&str>,
679    object: BTreeMap<KeyString, VrlValue>,
680    current_level: usize,
681    max_nested_levels: usize,
682) -> Result<()> {
683    // For safety, we do not allow the depth to be greater than the max_object_depth.
684    if current_level > max_nested_levels {
685        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
686    }
687
688    for (key, value) in object {
689        let new_key = base.map_or_else(
690            || key.clone(),
691            |base_key| format!("{base_key}.{key}").into(),
692        );
693
694        match value {
695            VrlValue::Object(object) => {
696                do_flatten_object(
697                    dest,
698                    Some(&new_key),
699                    object,
700                    current_level + 1,
701                    max_nested_levels,
702                )?;
703            }
704            // For other types, we will directly insert them into as JSON type.
705            _ => {
706                dest.insert(new_key, value);
707            }
708        }
709    }
710
711    Ok(())
712}
713
714#[cfg(test)]
715mod tests {
716    use api::v1::SemanticType;
717
718    use super::*;
719    use crate::{PipelineDefinition, identity_pipeline};
720
721    #[test]
722    fn test_identify_pipeline() {
723        let params = GreptimePipelineParams::default();
724        let pipeline_ctx = PipelineContext::new(
725            &PipelineDefinition::GreptimeIdentityPipeline(None),
726            &params,
727            Channel::Unknown,
728        );
729        {
730            let array = [
731                serde_json::json!({
732                    "woshinull": null,
733                    "name": "Alice",
734                    "age": 20,
735                    "is_student": true,
736                    "score": 99.5,
737                    "hobbies": "reading",
738                    "address": "Beijing",
739                }),
740                serde_json::json!({
741                    "name": "Bob",
742                    "age": 21,
743                    "is_student": false,
744                    "score": "88.5",
745                    "hobbies": "swimming",
746                    "address": "Shanghai",
747                    "gaga": "gaga"
748                }),
749            ];
750            let array = array.iter().map(|v| v.into()).collect();
751            let rows = identity_pipeline(array, None, &pipeline_ctx);
752            assert!(rows.is_err());
753            assert_eq!(
754                rows.err().unwrap().to_string(),
755                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
756            );
757        }
758        {
759            let array = [
760                serde_json::json!({
761                    "woshinull": null,
762                    "name": "Alice",
763                    "age": 20,
764                    "is_student": true,
765                    "score": 99.5,
766                    "hobbies": "reading",
767                    "address": "Beijing",
768                }),
769                serde_json::json!({
770                    "name": "Bob",
771                    "age": 21,
772                    "is_student": false,
773                    "score": 88,
774                    "hobbies": "swimming",
775                    "address": "Shanghai",
776                    "gaga": "gaga"
777                }),
778            ];
779            let array = array.iter().map(|v| v.into()).collect();
780            let rows = identity_pipeline(array, None, &pipeline_ctx);
781            assert!(rows.is_err());
782            assert_eq!(
783                rows.err().unwrap().to_string(),
784                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
785            );
786        }
787        {
788            let array = [
789                serde_json::json!({
790                    "woshinull": null,
791                    "name": "Alice",
792                    "age": 20,
793                    "is_student": true,
794                    "score": 99.5,
795                    "hobbies": "reading",
796                    "address": "Beijing",
797                }),
798                serde_json::json!({
799                    "name": "Bob",
800                    "age": 21,
801                    "is_student": false,
802                    "score": 88.5,
803                    "hobbies": "swimming",
804                    "address": "Shanghai",
805                    "gaga": "gaga"
806                }),
807            ];
808            let array = array.iter().map(|v| v.into()).collect();
809            let rows = identity_pipeline(array, None, &pipeline_ctx);
810            assert!(rows.is_ok());
811            let mut rows = rows.unwrap();
812            assert!(rows.len() == 1);
813            let rows = rows.remove(&ContextOpt::default()).unwrap();
814            assert_eq!(rows.schema.len(), 8);
815            assert_eq!(rows.rows.len(), 2);
816            assert_eq!(8, rows.rows[0].values.len());
817            assert_eq!(8, rows.rows[1].values.len());
818        }
819        {
820            let array = [
821                serde_json::json!({
822                    "woshinull": null,
823                    "name": "Alice",
824                    "age": 20,
825                    "is_student": true,
826                    "score": 99.5,
827                    "hobbies": "reading",
828                    "address": "Beijing",
829                }),
830                serde_json::json!({
831                    "name": "Bob",
832                    "age": 21,
833                    "is_student": false,
834                    "score": 88.5,
835                    "hobbies": "swimming",
836                    "address": "Shanghai",
837                    "gaga": "gaga"
838                }),
839            ];
840            let tag_column_names = ["name".to_string(), "address".to_string()];
841
842            let rows =
843                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
844                    .map(|(mut schema, mut rows)| {
845                        for name in tag_column_names {
846                            if let Some(index) = schema.index.get(&name) {
847                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
848                            }
849                        }
850
851                        assert!(rows.len() == 1);
852                        let rows = rows.remove(&ContextOpt::default()).unwrap();
853
854                        Rows {
855                            schema: schema.schema,
856                            rows,
857                        }
858                    });
859
860            assert!(rows.is_ok());
861            let rows = rows.unwrap();
862            assert_eq!(rows.schema.len(), 8);
863            assert_eq!(rows.rows.len(), 2);
864            assert_eq!(8, rows.rows[0].values.len());
865            assert_eq!(8, rows.rows[1].values.len());
866            assert_eq!(
867                rows.schema
868                    .iter()
869                    .find(|x| x.column_name == "name")
870                    .unwrap()
871                    .semantic_type,
872                SemanticType::Tag as i32
873            );
874            assert_eq!(
875                rows.schema
876                    .iter()
877                    .find(|x| x.column_name == "address")
878                    .unwrap()
879                    .semantic_type,
880                SemanticType::Tag as i32
881            );
882            assert_eq!(
883                rows.schema
884                    .iter()
885                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
886                    .count(),
887                2
888            );
889        }
890    }
891
892    #[test]
893    fn test_flatten() {
894        let test_cases = vec![
895            // Basic case.
896            (
897                serde_json::json!(
898                    {
899                        "a": {
900                            "b": {
901                                "c": [1, 2, 3]
902                            }
903                        },
904                        "d": [
905                            "foo",
906                            "bar"
907                        ],
908                        "e": {
909                            "f": [7, 8, 9],
910                            "g": {
911                                "h": 123,
912                                "i": "hello",
913                                "j": {
914                                    "k": true
915                                }
916                            }
917                        }
918                    }
919                ),
920                10,
921                Some(serde_json::json!(
922                    {
923                        "a.b.c": [1,2,3],
924                        "d": ["foo","bar"],
925                        "e.f": [7,8,9],
926                        "e.g.h": 123,
927                        "e.g.i": "hello",
928                        "e.g.j.k": true
929                    }
930                )),
931            ),
932            // Test the case where the object has more than 3 nested levels.
933            (
934                serde_json::json!(
935                    {
936                        "a": {
937                            "b": {
938                                "c": {
939                                    "d": [1, 2, 3]
940                                }
941                            }
942                        },
943                        "e": [
944                            "foo",
945                            "bar"
946                        ]
947                    }
948                ),
949                3,
950                None,
951            ),
952        ];
953
954        for (input, max_depth, expected) in test_cases {
955            let input = input.into();
956            let expected = expected.map(|e| e.into());
957
958            let flattened_object = flatten_object(input, max_depth).ok();
959            assert_eq!(flattened_object, expected);
960        }
961    }
962
963    #[test]
964    fn test_greptime_pipeline_params() {
965        let params = Some("flatten_json_object=true");
966        let pipeline_params = GreptimePipelineParams::from_params(params);
967        assert!(pipeline_params.flatten_json_object());
968
969        let params = None;
970        let pipeline_params = GreptimePipelineParams::from_params(params);
971        assert!(!pipeline_params.flatten_json_object());
972    }
973}