pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use session::context::Channel;
34use snafu::OptionExt;
35use vrl::prelude::VrlValueConvert;
36use vrl::value::{KeyString, Value as VrlValue};
37
38use crate::error::{
39    IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, ReachedMaxNestedLevelsSnafu,
40    Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
41    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
42};
43use crate::etl::ctx_req::ContextOpt;
44use crate::etl::field::{Field, Fields};
45use crate::etl::transform::index::Index;
46use crate::etl::transform::{Transform, Transforms};
47use crate::etl::PipelineDocVersion;
48use crate::{unwrap_or_continue_if_err, PipelineContext};
49
50const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53/// fields not in the columns will be discarded
54/// to prevent automatic column creation in GreptimeDB
55#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57    transforms: Transforms,
58    schema: Vec<ColumnSchema>,
59}
60
61fn truthy<V: AsRef<str>>(v: V) -> bool {
62    let v = v.as_ref().to_lowercase();
63    v == "true" || v == "1" || v == "yes" || v == "on" || v == "t"
64}
65
66/// Parameters that can be used to configure the greptime pipelines.
67#[derive(Debug, Default)]
68pub struct GreptimePipelineParams {
69    /// The original options for configuring the greptime pipelines.
70    /// This should not be used directly, instead, use the parsed shortcut option values.
71    options: HashMap<String, String>,
72
73    /// Parsed shortcut option values
74    pub flatten_json_object: OnceCell<bool>,
75    /// Whether to skip error when processing the pipeline.
76    pub skip_error: OnceCell<bool>,
77}
78
79impl GreptimePipelineParams {
80    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
81    /// The params is in the format of `key1=value1&key2=value2`,for example:
82    /// x-greptime-pipeline-params: flatten_json_object=true
83    pub fn from_params(params: Option<&str>) -> Self {
84        let options = Self::parse_header_str_to_map(params);
85
86        Self {
87            options,
88            skip_error: OnceCell::new(),
89            flatten_json_object: OnceCell::new(),
90        }
91    }
92
93    pub fn from_map(options: HashMap<String, String>) -> Self {
94        Self {
95            options,
96            skip_error: OnceCell::new(),
97            flatten_json_object: OnceCell::new(),
98        }
99    }
100
101    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
102        if let Some(params) = params {
103            if params.is_empty() {
104                HashMap::new()
105            } else {
106                params
107                    .split('&')
108                    .filter_map(|s| s.split_once('='))
109                    .map(|(k, v)| (k.to_string(), v.to_string()))
110                    .collect::<HashMap<String, String>>()
111            }
112        } else {
113            HashMap::new()
114        }
115    }
116
117    /// Whether to flatten the JSON object.
118    pub fn flatten_json_object(&self) -> bool {
119        *self.flatten_json_object.get_or_init(|| {
120            self.options
121                .get("flatten_json_object")
122                .map(|v| v == "true")
123                .unwrap_or(false)
124        })
125    }
126
127    /// Whether to skip error when processing the pipeline.
128    pub fn skip_error(&self) -> bool {
129        *self
130            .skip_error
131            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
132    }
133}
134
135impl GreptimeTransformer {
136    /// Add a default timestamp column to the transforms
137    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
138        let type_ = ColumnDataType::TimestampNanosecond;
139        let default = None;
140
141        let transform = Transform {
142            fields: Fields::one(Field::new(
143                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
144                None,
145            )),
146            type_,
147            default,
148            index: Some(Index::Time),
149            on_failure: Some(crate::etl::transform::OnFailure::Default),
150            tag: false,
151        };
152        transforms.push(transform);
153    }
154
155    /// Generate the schema for the GreptimeTransformer
156    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
157        let mut schema = vec![];
158        for transform in transforms.iter() {
159            schema.extend(coerce_columns(transform)?);
160        }
161        Ok(schema)
162    }
163}
164
165impl GreptimeTransformer {
166    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
167        // empty check is done in the caller
168        let mut column_names_set = HashSet::new();
169        let mut timestamp_columns = vec![];
170
171        for transform in transforms.iter() {
172            let target_fields_set = transform
173                .fields
174                .iter()
175                .map(|f| f.target_or_input_field())
176                .collect::<HashSet<_>>();
177
178            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
179            if !intersections.is_empty() {
180                let duplicates = intersections.iter().join(",");
181                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
182            }
183
184            column_names_set.extend(target_fields_set);
185
186            if let Some(idx) = transform.index {
187                if idx == Index::Time {
188                    match transform.fields.len() {
189                        //Safety unwrap is fine here because we have checked the length of real_fields
190                        1 => {
191                            timestamp_columns.push(transform.fields.first().unwrap().input_field())
192                        }
193                        _ => {
194                            return TransformMultipleTimestampIndexSnafu {
195                                columns: transform
196                                    .fields
197                                    .iter()
198                                    .map(|x| x.input_field())
199                                    .join(", "),
200                            }
201                            .fail();
202                        }
203                    }
204                }
205            }
206        }
207
208        let schema = match timestamp_columns.len() {
209            0 if doc_version == &PipelineDocVersion::V1 => {
210                // compatible with v1, add a default timestamp column
211                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
212                GreptimeTransformer::init_schemas(&transforms)?
213            }
214            1 => GreptimeTransformer::init_schemas(&transforms)?,
215            count => {
216                let columns = timestamp_columns.iter().join(", ");
217                return TransformTimestampIndexCountSnafu { count, columns }.fail();
218            }
219        };
220        Ok(GreptimeTransformer { transforms, schema })
221    }
222
223    pub fn transform_mut(
224        &self,
225        pipeline_map: &mut VrlValue,
226        is_v1: bool,
227    ) -> Result<Vec<GreptimeValue>> {
228        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
229        let mut output_index = 0;
230        for transform in self.transforms.iter() {
231            for field in transform.fields.iter() {
232                let column_name = field.input_field();
233
234                let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
235                // let keep us `get` here to be compatible with v1
236                match pipeline_map.get(column_name) {
237                    Some(v) => {
238                        let value_data = coerce_value(v, transform)?;
239                        // every transform fields has only one output field
240                        values[output_index] = GreptimeValue { value_data };
241                    }
242                    None => {
243                        let value_data = match transform.on_failure {
244                            Some(crate::etl::transform::OnFailure::Default) => {
245                                match transform.get_default() {
246                                    Some(default) => Some(default.clone()),
247                                    None => transform.get_default_value_when_data_is_none(),
248                                }
249                            }
250                            Some(crate::etl::transform::OnFailure::Ignore) => None,
251                            None => None,
252                        };
253                        if transform.is_timeindex() && value_data.is_none() {
254                            return TimeIndexMustBeNonNullSnafu.fail();
255                        }
256                        values[output_index] = GreptimeValue { value_data };
257                    }
258                }
259                output_index += 1;
260                if !is_v1 {
261                    // remove the column from the pipeline_map
262                    // so that the auto-transform can use the rest fields
263                    pipeline_map.remove(column_name);
264                }
265            }
266        }
267        Ok(values)
268    }
269
270    pub fn transforms(&self) -> &Transforms {
271        &self.transforms
272    }
273
274    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275        &self.schema
276    }
277
278    pub fn transforms_mut(&mut self) -> &mut Transforms {
279        &mut self.transforms
280    }
281}
282
283/// This is used to record the current state schema information and a sequential cache of field names.
284/// As you traverse the user input JSON, this will change.
285/// It will record a superset of all user input schemas.
286#[derive(Debug, Default)]
287pub struct SchemaInfo {
288    /// schema info
289    pub schema: Vec<ColumnSchema>,
290    /// index of the column name
291    pub index: HashMap<String, usize>,
292}
293
294impl SchemaInfo {
295    pub fn with_capacity(capacity: usize) -> Self {
296        Self {
297            schema: Vec::with_capacity(capacity),
298            index: HashMap::with_capacity(capacity),
299        }
300    }
301
302    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
303        let mut index = HashMap::new();
304        for (i, schema) in schema_list.iter().enumerate() {
305            index.insert(schema.column_name.clone(), i);
306        }
307        Self {
308            schema: schema_list,
309            index,
310        }
311    }
312}
313
314fn resolve_schema(
315    index: Option<usize>,
316    value_data: ValueData,
317    column_schema: ColumnSchema,
318    row: &mut Vec<GreptimeValue>,
319    schema_info: &mut SchemaInfo,
320) -> Result<()> {
321    if let Some(index) = index {
322        let api_value = GreptimeValue {
323            value_data: Some(value_data),
324        };
325        // Safety unwrap is fine here because api_value is always valid
326        let value_column_data_type = proto_value_type(&api_value).unwrap();
327        // Safety unwrap is fine here because index is always valid
328        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
329        if value_column_data_type != schema_column_data_type {
330            IdentifyPipelineColumnTypeMismatchSnafu {
331                column: column_schema.column_name,
332                expected: schema_column_data_type.as_str_name(),
333                actual: value_column_data_type.as_str_name(),
334            }
335            .fail()
336        } else {
337            row[index] = api_value;
338            Ok(())
339        }
340    } else {
341        let key = column_schema.column_name.clone();
342        schema_info.schema.push(column_schema);
343        schema_info.index.insert(key, schema_info.schema.len() - 1);
344        let api_value = GreptimeValue {
345            value_data: Some(value_data),
346        };
347        row.push(api_value);
348        Ok(())
349    }
350}
351
352fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
353    match p_ctx.channel {
354        Channel::Prometheus => {
355            let ts = values
356                .as_object()
357                .and_then(|m| m.get(GREPTIME_TIMESTAMP))
358                .and_then(|ts| ts.try_into_i64().ok())
359                .unwrap_or_default();
360            Ok(Some(ValueData::TimestampMillisecondValue(ts)))
361        }
362        _ => {
363            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
364            match custom_ts {
365                Some(ts) => {
366                    let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
367                    Some(ts.get_timestamp_value(ts_field)).transpose()
368                }
369                None => Ok(Some(ValueData::TimestampNanosecondValue(
370                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
371                ))),
372            }
373        }
374    }
375}
376
377pub(crate) fn values_to_row(
378    schema_info: &mut SchemaInfo,
379    values: VrlValue,
380    pipeline_ctx: &PipelineContext<'_>,
381    row: Option<Vec<GreptimeValue>>,
382    need_calc_ts: bool,
383) -> Result<Row> {
384    let mut row: Vec<GreptimeValue> =
385        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
386    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
387
388    if need_calc_ts {
389        // calculate timestamp value based on the channel
390        let ts = calc_ts(pipeline_ctx, &values)?;
391        row.push(GreptimeValue { value_data: ts });
392    }
393
394    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
395
396    // skip ts column
397    let ts_column_name = custom_ts
398        .as_ref()
399        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
400
401    let values = values.into_object().context(ValueMustBeMapSnafu)?;
402
403    for (column_name, value) in values {
404        if column_name.as_str() == ts_column_name {
405            continue;
406        }
407
408        resolve_value(
409            value,
410            column_name.into(),
411            &mut row,
412            schema_info,
413            pipeline_ctx,
414        )?;
415    }
416    Ok(Row { values: row })
417}
418
419fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
420    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
421        SemanticType::Tag as i32
422    } else {
423        SemanticType::Field as i32
424    }
425}
426
427fn resolve_value(
428    value: VrlValue,
429    column_name: String,
430    row: &mut Vec<GreptimeValue>,
431    schema_info: &mut SchemaInfo,
432    p_ctx: &PipelineContext,
433) -> Result<()> {
434    let index = schema_info.index.get(&column_name).copied();
435    let mut resolve_simple_type =
436        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
437            let semantic_type = decide_semantic(p_ctx, &column_name);
438            resolve_schema(
439                index,
440                value_data,
441                ColumnSchema {
442                    column_name,
443                    datatype: data_type as i32,
444                    semantic_type,
445                    datatype_extension: None,
446                    options: None,
447                },
448                row,
449                schema_info,
450            )
451        };
452
453    match value {
454        VrlValue::Null => {}
455
456        VrlValue::Integer(v) => {
457            // safe unwrap after type matched
458            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
459        }
460
461        VrlValue::Float(v) => {
462            // safe unwrap after type matched
463            resolve_simple_type(
464                ValueData::F64Value(v.into()),
465                column_name,
466                ColumnDataType::Float64,
467            )?;
468        }
469
470        VrlValue::Boolean(v) => {
471            resolve_simple_type(
472                ValueData::BoolValue(v),
473                column_name,
474                ColumnDataType::Boolean,
475            )?;
476        }
477
478        VrlValue::Bytes(v) => {
479            resolve_simple_type(
480                ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
481                column_name,
482                ColumnDataType::String,
483            )?;
484        }
485
486        VrlValue::Regex(v) => {
487            warn!(
488                "Persisting regex value in the table, this should not happen, column_name: {}",
489                column_name
490            );
491            resolve_simple_type(
492                ValueData::StringValue(v.to_string()),
493                column_name,
494                ColumnDataType::String,
495            )?;
496        }
497
498        VrlValue::Timestamp(ts) => {
499            let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
500                input: ts.to_rfc3339(),
501            })?;
502            resolve_simple_type(
503                ValueData::TimestampNanosecondValue(ns),
504                column_name,
505                ColumnDataType::TimestampNanosecond,
506            )?;
507        }
508
509        VrlValue::Array(_) | VrlValue::Object(_) => {
510            let data = vrl_value_to_jsonb_value(&value);
511            resolve_schema(
512                index,
513                ValueData::BinaryValue(data.to_vec()),
514                ColumnSchema {
515                    column_name,
516                    datatype: ColumnDataType::Binary as i32,
517                    semantic_type: SemanticType::Field as i32,
518                    datatype_extension: Some(ColumnDataTypeExtension {
519                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
520                    }),
521                    options: None,
522                },
523                row,
524                schema_info,
525            )?;
526        }
527    }
528    Ok(())
529}
530
531fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
532    match value {
533        VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
534        VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
535        VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
536        VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
537        VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
538        VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
539        VrlValue::Object(btree_map) => jsonb::Value::Object(
540            btree_map
541                .iter()
542                .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
543                .collect(),
544        ),
545        VrlValue::Array(values) => jsonb::Value::Array(
546            values
547                .iter()
548                .map(|value| vrl_value_to_jsonb_value(value))
549                .collect(),
550        ),
551        VrlValue::Null => jsonb::Value::Null,
552    }
553}
554
555fn identity_pipeline_inner(
556    pipeline_maps: Vec<VrlValue>,
557    pipeline_ctx: &PipelineContext<'_>,
558) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
559    let skip_error = pipeline_ctx.pipeline_param.skip_error();
560    let mut schema_info = SchemaInfo::default();
561    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
562
563    // set time index column schema first
564    schema_info.schema.push(ColumnSchema {
565        column_name: custom_ts
566            .map(|ts| ts.get_column_name().to_string())
567            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
568        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
569            if pipeline_ctx.channel == Channel::Prometheus {
570                ColumnDataType::TimestampMillisecond
571            } else {
572                ColumnDataType::TimestampNanosecond
573            }
574        }) as i32,
575        semantic_type: SemanticType::Timestamp as i32,
576        datatype_extension: None,
577        options: None,
578    });
579
580    let mut opt_map = HashMap::new();
581    let len = pipeline_maps.len();
582
583    for mut pipeline_map in pipeline_maps {
584        let opt = unwrap_or_continue_if_err!(
585            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
586            skip_error
587        );
588        let row = unwrap_or_continue_if_err!(
589            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
590            skip_error
591        );
592
593        opt_map
594            .entry(opt)
595            .or_insert_with(|| Vec::with_capacity(len))
596            .push(row);
597    }
598
599    let column_count = schema_info.schema.len();
600    for (_, row) in opt_map.iter_mut() {
601        for row in row.iter_mut() {
602            let diff = column_count - row.values.len();
603            for _ in 0..diff {
604                row.values.push(GreptimeValue { value_data: None });
605            }
606        }
607    }
608
609    Ok((schema_info, opt_map))
610}
611
612/// Identity pipeline for Greptime
613/// This pipeline will convert the input JSON array to Greptime Rows
614/// params table is used to set the semantic type of the row key column to Tag
615/// 1. The pipeline will add a default timestamp column to the schema
616/// 2. The pipeline not resolve NULL value
617/// 3. The pipeline assumes that the json format is fixed
618/// 4. The pipeline will return an error if the same column datatype is mismatched
619/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
620pub fn identity_pipeline(
621    array: Vec<VrlValue>,
622    table: Option<Arc<table::Table>>,
623    pipeline_ctx: &PipelineContext<'_>,
624) -> Result<HashMap<ContextOpt, Rows>> {
625    let skip_error = pipeline_ctx.pipeline_param.skip_error();
626    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
627        let mut results = Vec::with_capacity(array.len());
628        for item in array.into_iter() {
629            let result = unwrap_or_continue_if_err!(
630                flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
631                skip_error
632            );
633            results.push(result);
634        }
635        results
636    } else {
637        array
638    };
639
640    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
641        if let Some(table) = table {
642            let table_info = table.table_info();
643            for tag_name in table_info.meta.row_key_column_names() {
644                if let Some(index) = schema.index.get(tag_name) {
645                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
646                }
647            }
648        }
649
650        opt_map
651            .into_iter()
652            .map(|(opt, rows)| {
653                (
654                    opt,
655                    Rows {
656                        schema: schema.schema.clone(),
657                        rows,
658                    },
659                )
660            })
661            .collect::<HashMap<ContextOpt, Rows>>()
662    })
663}
664
665/// Consumes the JSON object and consumes it into a single-level object.
666///
667/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
668/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
669pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
670    let mut flattened = BTreeMap::new();
671    let object = object.into_object().context(ValueMustBeMapSnafu)?;
672
673    if !object.is_empty() {
674        // it will use recursion to flatten the object.
675        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
676    }
677
678    Ok(VrlValue::Object(flattened))
679}
680
681fn do_flatten_object(
682    dest: &mut BTreeMap<KeyString, VrlValue>,
683    base: Option<&str>,
684    object: BTreeMap<KeyString, VrlValue>,
685    current_level: usize,
686    max_nested_levels: usize,
687) -> Result<()> {
688    // For safety, we do not allow the depth to be greater than the max_object_depth.
689    if current_level > max_nested_levels {
690        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
691    }
692
693    for (key, value) in object {
694        let new_key = base.map_or_else(
695            || key.clone(),
696            |base_key| format!("{base_key}.{key}").into(),
697        );
698
699        match value {
700            VrlValue::Object(object) => {
701                do_flatten_object(
702                    dest,
703                    Some(&new_key),
704                    object,
705                    current_level + 1,
706                    max_nested_levels,
707                )?;
708            }
709            // For other types, we will directly insert them into as JSON type.
710            _ => {
711                dest.insert(new_key, value);
712            }
713        }
714    }
715
716    Ok(())
717}
718
719#[cfg(test)]
720mod tests {
721    use api::v1::SemanticType;
722
723    use super::*;
724    use crate::{identity_pipeline, PipelineDefinition};
725
726    #[test]
727    fn test_identify_pipeline() {
728        let params = GreptimePipelineParams::default();
729        let pipeline_ctx = PipelineContext::new(
730            &PipelineDefinition::GreptimeIdentityPipeline(None),
731            &params,
732            Channel::Unknown,
733        );
734        {
735            let array = [
736                serde_json::json!({
737                    "woshinull": null,
738                    "name": "Alice",
739                    "age": 20,
740                    "is_student": true,
741                    "score": 99.5,
742                    "hobbies": "reading",
743                    "address": "Beijing",
744                }),
745                serde_json::json!({
746                    "name": "Bob",
747                    "age": 21,
748                    "is_student": false,
749                    "score": "88.5",
750                    "hobbies": "swimming",
751                    "address": "Shanghai",
752                    "gaga": "gaga"
753                }),
754            ];
755            let array = array.iter().map(|v| v.into()).collect();
756            let rows = identity_pipeline(array, None, &pipeline_ctx);
757            assert!(rows.is_err());
758            assert_eq!(
759                rows.err().unwrap().to_string(),
760                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
761            );
762        }
763        {
764            let array = [
765                serde_json::json!({
766                    "woshinull": null,
767                    "name": "Alice",
768                    "age": 20,
769                    "is_student": true,
770                    "score": 99.5,
771                    "hobbies": "reading",
772                    "address": "Beijing",
773                }),
774                serde_json::json!({
775                    "name": "Bob",
776                    "age": 21,
777                    "is_student": false,
778                    "score": 88,
779                    "hobbies": "swimming",
780                    "address": "Shanghai",
781                    "gaga": "gaga"
782                }),
783            ];
784            let array = array.iter().map(|v| v.into()).collect();
785            let rows = identity_pipeline(array, None, &pipeline_ctx);
786            assert!(rows.is_err());
787            assert_eq!(
788                rows.err().unwrap().to_string(),
789                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
790            );
791        }
792        {
793            let array = [
794                serde_json::json!({
795                    "woshinull": null,
796                    "name": "Alice",
797                    "age": 20,
798                    "is_student": true,
799                    "score": 99.5,
800                    "hobbies": "reading",
801                    "address": "Beijing",
802                }),
803                serde_json::json!({
804                    "name": "Bob",
805                    "age": 21,
806                    "is_student": false,
807                    "score": 88.5,
808                    "hobbies": "swimming",
809                    "address": "Shanghai",
810                    "gaga": "gaga"
811                }),
812            ];
813            let array = array.iter().map(|v| v.into()).collect();
814            let rows = identity_pipeline(array, None, &pipeline_ctx);
815            assert!(rows.is_ok());
816            let mut rows = rows.unwrap();
817            assert!(rows.len() == 1);
818            let rows = rows.remove(&ContextOpt::default()).unwrap();
819            assert_eq!(rows.schema.len(), 8);
820            assert_eq!(rows.rows.len(), 2);
821            assert_eq!(8, rows.rows[0].values.len());
822            assert_eq!(8, rows.rows[1].values.len());
823        }
824        {
825            let array = [
826                serde_json::json!({
827                    "woshinull": null,
828                    "name": "Alice",
829                    "age": 20,
830                    "is_student": true,
831                    "score": 99.5,
832                    "hobbies": "reading",
833                    "address": "Beijing",
834                }),
835                serde_json::json!({
836                    "name": "Bob",
837                    "age": 21,
838                    "is_student": false,
839                    "score": 88.5,
840                    "hobbies": "swimming",
841                    "address": "Shanghai",
842                    "gaga": "gaga"
843                }),
844            ];
845            let tag_column_names = ["name".to_string(), "address".to_string()];
846
847            let rows =
848                identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
849                    .map(|(mut schema, mut rows)| {
850                        for name in tag_column_names {
851                            if let Some(index) = schema.index.get(&name) {
852                                schema.schema[*index].semantic_type = SemanticType::Tag as i32;
853                            }
854                        }
855
856                        assert!(rows.len() == 1);
857                        let rows = rows.remove(&ContextOpt::default()).unwrap();
858
859                        Rows {
860                            schema: schema.schema,
861                            rows,
862                        }
863                    });
864
865            assert!(rows.is_ok());
866            let rows = rows.unwrap();
867            assert_eq!(rows.schema.len(), 8);
868            assert_eq!(rows.rows.len(), 2);
869            assert_eq!(8, rows.rows[0].values.len());
870            assert_eq!(8, rows.rows[1].values.len());
871            assert_eq!(
872                rows.schema
873                    .iter()
874                    .find(|x| x.column_name == "name")
875                    .unwrap()
876                    .semantic_type,
877                SemanticType::Tag as i32
878            );
879            assert_eq!(
880                rows.schema
881                    .iter()
882                    .find(|x| x.column_name == "address")
883                    .unwrap()
884                    .semantic_type,
885                SemanticType::Tag as i32
886            );
887            assert_eq!(
888                rows.schema
889                    .iter()
890                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
891                    .count(),
892                2
893            );
894        }
895    }
896
897    #[test]
898    fn test_flatten() {
899        let test_cases = vec![
900            // Basic case.
901            (
902                serde_json::json!(
903                    {
904                        "a": {
905                            "b": {
906                                "c": [1, 2, 3]
907                            }
908                        },
909                        "d": [
910                            "foo",
911                            "bar"
912                        ],
913                        "e": {
914                            "f": [7, 8, 9],
915                            "g": {
916                                "h": 123,
917                                "i": "hello",
918                                "j": {
919                                    "k": true
920                                }
921                            }
922                        }
923                    }
924                ),
925                10,
926                Some(serde_json::json!(
927                    {
928                        "a.b.c": [1,2,3],
929                        "d": ["foo","bar"],
930                        "e.f": [7,8,9],
931                        "e.g.h": 123,
932                        "e.g.i": "hello",
933                        "e.g.j.k": true
934                    }
935                )),
936            ),
937            // Test the case where the object has more than 3 nested levels.
938            (
939                serde_json::json!(
940                    {
941                        "a": {
942                            "b": {
943                                "c": {
944                                    "d": [1, 2, 3]
945                                }
946                            }
947                        },
948                        "e": [
949                            "foo",
950                            "bar"
951                        ]
952                    }
953                ),
954                3,
955                None,
956            ),
957        ];
958
959        for (input, max_depth, expected) in test_cases {
960            let input = input.into();
961            let expected = expected.map(|e| e.into());
962
963            let flattened_object = flatten_object(input, max_depth).ok();
964            assert_eq!(flattened_object, expected);
965        }
966    }
967
968    #[test]
969    fn test_greptime_pipeline_params() {
970        let params = Some("flatten_json_object=true");
971        let pipeline_params = GreptimePipelineParams::from_params(params);
972        assert!(pipeline_params.flatten_json_object());
973
974        let params = None;
975        let pipeline_params = GreptimePipelineParams::from_params(params);
976        assert!(!pipeline_params.flatten_json_object());
977    }
978}