pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::collections::{BTreeMap, HashSet};
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
27use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
28use itertools::Itertools;
29use once_cell::sync::OnceCell;
30use serde_json::Number;
31use session::context::Channel;
32use snafu::OptionExt;
33
34use crate::error::{
35    IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
36    TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
37    TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
38    UnsupportedNumberTypeSnafu, ValueMustBeMapSnafu,
39};
40use crate::etl::ctx_req::ContextOpt;
41use crate::etl::field::{Field, Fields};
42use crate::etl::transform::index::Index;
43use crate::etl::transform::{Transform, Transforms};
44use crate::etl::value::{Timestamp, Value};
45use crate::etl::PipelineDocVersion;
46use crate::{unwrap_or_continue_if_err, Map, PipelineContext};
47
48const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
49const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
50
51/// fields not in the columns will be discarded
52/// to prevent automatic column creation in GreptimeDB
53#[derive(Debug, Clone)]
54pub struct GreptimeTransformer {
55    transforms: Transforms,
56    schema: Vec<ColumnSchema>,
57}
58
59fn truthy<V: AsRef<str>>(v: V) -> bool {
60    let v = v.as_ref().to_lowercase();
61    v == "true" || v == "1" || v == "yes" || v == "on" || v == "t"
62}
63
64/// Parameters that can be used to configure the greptime pipelines.
65#[derive(Debug, Default)]
66pub struct GreptimePipelineParams {
67    /// The original options for configuring the greptime pipelines.
68    /// This should not be used directly, instead, use the parsed shortcut option values.
69    options: HashMap<String, String>,
70
71    /// Parsed shortcut option values
72    pub flatten_json_object: OnceCell<bool>,
73    /// Whether to skip error when processing the pipeline.
74    pub skip_error: OnceCell<bool>,
75}
76
77impl GreptimePipelineParams {
78    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
79    /// The params is in the format of `key1=value1&key2=value2`,for example:
80    /// x-greptime-pipeline-params: flatten_json_object=true
81    pub fn from_params(params: Option<&str>) -> Self {
82        let options = Self::parse_header_str_to_map(params);
83
84        Self {
85            options,
86            skip_error: OnceCell::new(),
87            flatten_json_object: OnceCell::new(),
88        }
89    }
90
91    pub fn from_map(options: HashMap<String, String>) -> Self {
92        Self {
93            options,
94            skip_error: OnceCell::new(),
95            flatten_json_object: OnceCell::new(),
96        }
97    }
98
99    pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
100        if let Some(params) = params {
101            if params.is_empty() {
102                HashMap::new()
103            } else {
104                params
105                    .split('&')
106                    .filter_map(|s| s.split_once('='))
107                    .map(|(k, v)| (k.to_string(), v.to_string()))
108                    .collect::<HashMap<String, String>>()
109            }
110        } else {
111            HashMap::new()
112        }
113    }
114
115    /// Whether to flatten the JSON object.
116    pub fn flatten_json_object(&self) -> bool {
117        *self.flatten_json_object.get_or_init(|| {
118            self.options
119                .get("flatten_json_object")
120                .map(|v| v == "true")
121                .unwrap_or(false)
122        })
123    }
124
125    /// Whether to skip error when processing the pipeline.
126    pub fn skip_error(&self) -> bool {
127        *self
128            .skip_error
129            .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
130    }
131}
132
133impl GreptimeTransformer {
134    /// Add a default timestamp column to the transforms
135    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
136        let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
137        let default = None;
138
139        let transform = Transform {
140            fields: Fields::one(Field::new(
141                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
142                None,
143            )),
144            type_,
145            default,
146            index: Some(Index::Time),
147            on_failure: Some(crate::etl::transform::OnFailure::Default),
148            tag: false,
149        };
150        transforms.push(transform);
151    }
152
153    /// Generate the schema for the GreptimeTransformer
154    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
155        let mut schema = vec![];
156        for transform in transforms.iter() {
157            schema.extend(coerce_columns(transform)?);
158        }
159        Ok(schema)
160    }
161}
162
163impl GreptimeTransformer {
164    pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
165        // empty check is done in the caller
166        let mut column_names_set = HashSet::new();
167        let mut timestamp_columns = vec![];
168
169        for transform in transforms.iter() {
170            let target_fields_set = transform
171                .fields
172                .iter()
173                .map(|f| f.target_or_input_field())
174                .collect::<HashSet<_>>();
175
176            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
177            if !intersections.is_empty() {
178                let duplicates = intersections.iter().join(",");
179                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
180            }
181
182            column_names_set.extend(target_fields_set);
183
184            if let Some(idx) = transform.index {
185                if idx == Index::Time {
186                    match transform.fields.len() {
187                        //Safety unwrap is fine here because we have checked the length of real_fields
188                        1 => {
189                            timestamp_columns.push(transform.fields.first().unwrap().input_field())
190                        }
191                        _ => {
192                            return TransformMultipleTimestampIndexSnafu {
193                                columns: transform
194                                    .fields
195                                    .iter()
196                                    .map(|x| x.input_field())
197                                    .join(", "),
198                            }
199                            .fail();
200                        }
201                    }
202                }
203            }
204        }
205
206        let schema = match timestamp_columns.len() {
207            0 if doc_version == &PipelineDocVersion::V1 => {
208                // compatible with v1, add a default timestamp column
209                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
210                GreptimeTransformer::init_schemas(&transforms)?
211            }
212            1 => GreptimeTransformer::init_schemas(&transforms)?,
213            count => {
214                let columns = timestamp_columns.iter().join(", ");
215                return TransformTimestampIndexCountSnafu { count, columns }.fail();
216            }
217        };
218        Ok(GreptimeTransformer { transforms, schema })
219    }
220
221    pub fn transform_mut(
222        &self,
223        pipeline_map: &mut Value,
224        is_v1: bool,
225    ) -> Result<Vec<GreptimeValue>> {
226        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
227        let mut output_index = 0;
228        for transform in self.transforms.iter() {
229            for field in transform.fields.iter() {
230                let column_name = field.input_field();
231
232                // let keep us `get` here to be compatible with v1
233                match pipeline_map.get(column_name) {
234                    Some(v) => {
235                        let value_data = coerce_value(v, transform)?;
236                        // every transform fields has only one output field
237                        values[output_index] = GreptimeValue { value_data };
238                    }
239                    None => {
240                        let value_data = match transform.on_failure {
241                            Some(crate::etl::transform::OnFailure::Default) => {
242                                match transform.get_default() {
243                                    Some(default) => coerce_value(default, transform)?,
244                                    None => match transform.get_default_value_when_data_is_none() {
245                                        Some(default) => coerce_value(&default, transform)?,
246                                        None => None,
247                                    },
248                                }
249                            }
250                            Some(crate::etl::transform::OnFailure::Ignore) => None,
251                            None => None,
252                        };
253                        if transform.is_timeindex() && value_data.is_none() {
254                            return TimeIndexMustBeNonNullSnafu.fail();
255                        }
256                        values[output_index] = GreptimeValue { value_data };
257                    }
258                }
259                output_index += 1;
260                if !is_v1 {
261                    // remove the column from the pipeline_map
262                    // so that the auto-transform can use the rest fields
263                    pipeline_map.remove(column_name);
264                }
265            }
266        }
267        Ok(values)
268    }
269
270    pub fn transforms(&self) -> &Transforms {
271        &self.transforms
272    }
273
274    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
275        &self.schema
276    }
277
278    pub fn transforms_mut(&mut self) -> &mut Transforms {
279        &mut self.transforms
280    }
281}
282
283/// This is used to record the current state schema information and a sequential cache of field names.
284/// As you traverse the user input JSON, this will change.
285/// It will record a superset of all user input schemas.
286#[derive(Debug, Default)]
287pub struct SchemaInfo {
288    /// schema info
289    pub schema: Vec<ColumnSchema>,
290    /// index of the column name
291    pub index: HashMap<String, usize>,
292}
293
294impl SchemaInfo {
295    pub fn with_capacity(capacity: usize) -> Self {
296        Self {
297            schema: Vec::with_capacity(capacity),
298            index: HashMap::with_capacity(capacity),
299        }
300    }
301
302    pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
303        let mut index = HashMap::new();
304        for (i, schema) in schema_list.iter().enumerate() {
305            index.insert(schema.column_name.clone(), i);
306        }
307        Self {
308            schema: schema_list,
309            index,
310        }
311    }
312}
313
314fn resolve_schema(
315    index: Option<usize>,
316    value_data: ValueData,
317    column_schema: ColumnSchema,
318    row: &mut Vec<GreptimeValue>,
319    schema_info: &mut SchemaInfo,
320) -> Result<()> {
321    if let Some(index) = index {
322        let api_value = GreptimeValue {
323            value_data: Some(value_data),
324        };
325        // Safety unwrap is fine here because api_value is always valid
326        let value_column_data_type = proto_value_type(&api_value).unwrap();
327        // Safety unwrap is fine here because index is always valid
328        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
329        if value_column_data_type != schema_column_data_type {
330            IdentifyPipelineColumnTypeMismatchSnafu {
331                column: column_schema.column_name,
332                expected: schema_column_data_type.as_str_name(),
333                actual: value_column_data_type.as_str_name(),
334            }
335            .fail()
336        } else {
337            row[index] = api_value;
338            Ok(())
339        }
340    } else {
341        let key = column_schema.column_name.clone();
342        schema_info.schema.push(column_schema);
343        schema_info.index.insert(key, schema_info.schema.len() - 1);
344        let api_value = GreptimeValue {
345            value_data: Some(value_data),
346        };
347        row.push(api_value);
348        Ok(())
349    }
350}
351
352fn resolve_number_schema(
353    n: Number,
354    column_name: String,
355    index: Option<usize>,
356    row: &mut Vec<GreptimeValue>,
357    schema_info: &mut SchemaInfo,
358) -> Result<()> {
359    let (value, datatype, semantic_type) = if n.is_i64() {
360        (
361            ValueData::I64Value(n.as_i64().unwrap()),
362            ColumnDataType::Int64 as i32,
363            SemanticType::Field as i32,
364        )
365    } else if n.is_u64() {
366        (
367            ValueData::U64Value(n.as_u64().unwrap()),
368            ColumnDataType::Uint64 as i32,
369            SemanticType::Field as i32,
370        )
371    } else if n.is_f64() {
372        (
373            ValueData::F64Value(n.as_f64().unwrap()),
374            ColumnDataType::Float64 as i32,
375            SemanticType::Field as i32,
376        )
377    } else {
378        return UnsupportedNumberTypeSnafu { value: n }.fail();
379    };
380    resolve_schema(
381        index,
382        value,
383        ColumnSchema {
384            column_name,
385            datatype,
386            semantic_type,
387            datatype_extension: None,
388            options: None,
389        },
390        row,
391        schema_info,
392    )
393}
394
395fn calc_ts(p_ctx: &PipelineContext, values: &Value) -> Result<Option<ValueData>> {
396    match p_ctx.channel {
397        Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
398            values
399                .get(GREPTIME_TIMESTAMP)
400                .and_then(|v| v.as_i64())
401                .unwrap_or_default(),
402        ))),
403        _ => {
404            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
405            match custom_ts {
406                Some(ts) => {
407                    let ts_field = values.get(ts.get_column_name());
408                    Some(ts.get_timestamp(ts_field)).transpose()
409                }
410                None => Ok(Some(ValueData::TimestampNanosecondValue(
411                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
412                ))),
413            }
414        }
415    }
416}
417
418pub(crate) fn values_to_row(
419    schema_info: &mut SchemaInfo,
420    values: Value,
421    pipeline_ctx: &PipelineContext<'_>,
422    row: Option<Vec<GreptimeValue>>,
423) -> Result<Row> {
424    let mut row: Vec<GreptimeValue> =
425        row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
426    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
427
428    // calculate timestamp value based on the channel
429    let ts = calc_ts(pipeline_ctx, &values)?;
430
431    row.push(GreptimeValue { value_data: ts });
432
433    row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
434
435    // skip ts column
436    let ts_column_name = custom_ts
437        .as_ref()
438        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
439
440    let values = values.into_map().context(ValueMustBeMapSnafu)?;
441
442    for (column_name, value) in values {
443        if column_name == ts_column_name {
444            continue;
445        }
446
447        resolve_value(value, column_name, &mut row, schema_info, pipeline_ctx)?;
448    }
449    Ok(Row { values: row })
450}
451
452fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
453    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
454        SemanticType::Tag as i32
455    } else {
456        SemanticType::Field as i32
457    }
458}
459
460fn resolve_value(
461    value: Value,
462    column_name: String,
463    row: &mut Vec<GreptimeValue>,
464    schema_info: &mut SchemaInfo,
465    p_ctx: &PipelineContext,
466) -> Result<()> {
467    let index = schema_info.index.get(&column_name).copied();
468    let mut resolve_simple_type =
469        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
470            let semantic_type = decide_semantic(p_ctx, &column_name);
471            resolve_schema(
472                index,
473                value_data,
474                ColumnSchema {
475                    column_name,
476                    datatype: data_type as i32,
477                    semantic_type,
478                    datatype_extension: None,
479                    options: None,
480                },
481                row,
482                schema_info,
483            )
484        };
485
486    match value {
487        Value::Null => {}
488
489        Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
490            // safe unwrap after type matched
491            let v = value.as_i64().unwrap();
492            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
493        }
494
495        Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
496            // safe unwrap after type matched
497            let v = value.as_u64().unwrap();
498            resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
499        }
500
501        Value::Float32(_) | Value::Float64(_) => {
502            // safe unwrap after type matched
503            let v = value.as_f64().unwrap();
504            resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
505        }
506
507        Value::Boolean(v) => {
508            resolve_simple_type(
509                ValueData::BoolValue(v),
510                column_name,
511                ColumnDataType::Boolean,
512            )?;
513        }
514
515        Value::String(v) => {
516            resolve_simple_type(
517                ValueData::StringValue(v),
518                column_name,
519                ColumnDataType::String,
520            )?;
521        }
522
523        Value::Timestamp(Timestamp::Nanosecond(ns)) => {
524            resolve_simple_type(
525                ValueData::TimestampNanosecondValue(ns),
526                column_name,
527                ColumnDataType::TimestampNanosecond,
528            )?;
529        }
530
531        Value::Timestamp(Timestamp::Microsecond(us)) => {
532            resolve_simple_type(
533                ValueData::TimestampMicrosecondValue(us),
534                column_name,
535                ColumnDataType::TimestampMicrosecond,
536            )?;
537        }
538
539        Value::Timestamp(Timestamp::Millisecond(ms)) => {
540            resolve_simple_type(
541                ValueData::TimestampMillisecondValue(ms),
542                column_name,
543                ColumnDataType::TimestampMillisecond,
544            )?;
545        }
546
547        Value::Timestamp(Timestamp::Second(s)) => {
548            resolve_simple_type(
549                ValueData::TimestampSecondValue(s),
550                column_name,
551                ColumnDataType::TimestampSecond,
552            )?;
553        }
554
555        Value::Array(_) | Value::Map(_) => {
556            let data: jsonb::Value = value.into();
557            resolve_schema(
558                index,
559                ValueData::BinaryValue(data.to_vec()),
560                ColumnSchema {
561                    column_name,
562                    datatype: ColumnDataType::Binary as i32,
563                    semantic_type: SemanticType::Field as i32,
564                    datatype_extension: Some(ColumnDataTypeExtension {
565                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
566                    }),
567                    options: None,
568                },
569                row,
570                schema_info,
571            )?;
572        }
573    }
574    Ok(())
575}
576
577fn identity_pipeline_inner(
578    pipeline_maps: Vec<Value>,
579    pipeline_ctx: &PipelineContext<'_>,
580) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
581    let skip_error = pipeline_ctx.pipeline_param.skip_error();
582    let mut schema_info = SchemaInfo::default();
583    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
584
585    // set time index column schema first
586    schema_info.schema.push(ColumnSchema {
587        column_name: custom_ts
588            .map(|ts| ts.get_column_name().clone())
589            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
590        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
591            if pipeline_ctx.channel == Channel::Prometheus {
592                ColumnDataType::TimestampMillisecond
593            } else {
594                ColumnDataType::TimestampNanosecond
595            }
596        }) as i32,
597        semantic_type: SemanticType::Timestamp as i32,
598        datatype_extension: None,
599        options: None,
600    });
601
602    let mut opt_map = HashMap::new();
603    let len = pipeline_maps.len();
604
605    for mut pipeline_map in pipeline_maps {
606        let opt = unwrap_or_continue_if_err!(
607            ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
608            skip_error
609        );
610        let row = unwrap_or_continue_if_err!(
611            values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None),
612            skip_error
613        );
614
615        opt_map
616            .entry(opt)
617            .or_insert_with(|| Vec::with_capacity(len))
618            .push(row);
619    }
620
621    let column_count = schema_info.schema.len();
622    for (_, row) in opt_map.iter_mut() {
623        for row in row.iter_mut() {
624            let diff = column_count - row.values.len();
625            for _ in 0..diff {
626                row.values.push(GreptimeValue { value_data: None });
627            }
628        }
629    }
630
631    Ok((schema_info, opt_map))
632}
633
634/// Identity pipeline for Greptime
635/// This pipeline will convert the input JSON array to Greptime Rows
636/// params table is used to set the semantic type of the row key column to Tag
637/// 1. The pipeline will add a default timestamp column to the schema
638/// 2. The pipeline not resolve NULL value
639/// 3. The pipeline assumes that the json format is fixed
640/// 4. The pipeline will return an error if the same column datatype is mismatched
641/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
642pub fn identity_pipeline(
643    array: Vec<Value>,
644    table: Option<Arc<table::Table>>,
645    pipeline_ctx: &PipelineContext<'_>,
646) -> Result<HashMap<ContextOpt, Rows>> {
647    let skip_error = pipeline_ctx.pipeline_param.skip_error();
648    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
649        let mut results = Vec::with_capacity(array.len());
650        for item in array.into_iter() {
651            let result = unwrap_or_continue_if_err!(
652                flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING),
653                skip_error
654            );
655            results.push(result);
656        }
657        results
658    } else {
659        array
660    };
661
662    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
663        if let Some(table) = table {
664            let table_info = table.table_info();
665            for tag_name in table_info.meta.row_key_column_names() {
666                if let Some(index) = schema.index.get(tag_name) {
667                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
668                }
669            }
670        }
671
672        opt_map
673            .into_iter()
674            .map(|(opt, rows)| {
675                (
676                    opt,
677                    Rows {
678                        schema: schema.schema.clone(),
679                        rows,
680                    },
681                )
682            })
683            .collect::<HashMap<ContextOpt, Rows>>()
684    })
685}
686
687/// Consumes the JSON object and consumes it into a single-level object.
688///
689/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
690/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
691pub fn flatten_object(object: Value, max_nested_levels: usize) -> Result<Value> {
692    let mut flattened = BTreeMap::new();
693    let object = object.into_map().context(ValueMustBeMapSnafu)?;
694
695    if !object.is_empty() {
696        // it will use recursion to flatten the object.
697        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
698    }
699
700    Ok(Value::Map(Map { values: flattened }))
701}
702
703fn do_flatten_object(
704    dest: &mut BTreeMap<String, Value>,
705    base: Option<&str>,
706    object: BTreeMap<String, Value>,
707    current_level: usize,
708    max_nested_levels: usize,
709) -> Result<()> {
710    // For safety, we do not allow the depth to be greater than the max_object_depth.
711    if current_level > max_nested_levels {
712        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
713    }
714
715    for (key, value) in object {
716        let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
717
718        match value {
719            Value::Map(object) => {
720                do_flatten_object(
721                    dest,
722                    Some(&new_key),
723                    object.values,
724                    current_level + 1,
725                    max_nested_levels,
726                )?;
727            }
728            // For other types, we will directly insert them into as JSON type.
729            _ => {
730                dest.insert(new_key, value);
731            }
732        }
733    }
734
735    Ok(())
736}
737
738#[cfg(test)]
739mod tests {
740    use api::v1::SemanticType;
741
742    use super::*;
743    use crate::etl::{json_array_to_map, json_to_map};
744    use crate::{identity_pipeline, PipelineDefinition};
745
746    #[test]
747    fn test_identify_pipeline() {
748        let params = GreptimePipelineParams::default();
749        let pipeline_ctx = PipelineContext::new(
750            &PipelineDefinition::GreptimeIdentityPipeline(None),
751            &params,
752            Channel::Unknown,
753        );
754        {
755            let array = vec![
756                serde_json::json!({
757                    "woshinull": null,
758                    "name": "Alice",
759                    "age": 20,
760                    "is_student": true,
761                    "score": 99.5,
762                    "hobbies": "reading",
763                    "address": "Beijing",
764                }),
765                serde_json::json!({
766                    "name": "Bob",
767                    "age": 21,
768                    "is_student": false,
769                    "score": "88.5",
770                    "hobbies": "swimming",
771                    "address": "Shanghai",
772                    "gaga": "gaga"
773                }),
774            ];
775            let array = json_array_to_map(array).unwrap();
776            let rows = identity_pipeline(array, None, &pipeline_ctx);
777            assert!(rows.is_err());
778            assert_eq!(
779                rows.err().unwrap().to_string(),
780                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
781            );
782        }
783        {
784            let array = vec![
785                serde_json::json!({
786                    "woshinull": null,
787                    "name": "Alice",
788                    "age": 20,
789                    "is_student": true,
790                    "score": 99.5,
791                    "hobbies": "reading",
792                    "address": "Beijing",
793                }),
794                serde_json::json!({
795                    "name": "Bob",
796                    "age": 21,
797                    "is_student": false,
798                    "score": 88,
799                    "hobbies": "swimming",
800                    "address": "Shanghai",
801                    "gaga": "gaga"
802                }),
803            ];
804            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
805            assert!(rows.is_err());
806            assert_eq!(
807                rows.err().unwrap().to_string(),
808                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
809            );
810        }
811        {
812            let array = vec![
813                serde_json::json!({
814                    "woshinull": null,
815                    "name": "Alice",
816                    "age": 20,
817                    "is_student": true,
818                    "score": 99.5,
819                    "hobbies": "reading",
820                    "address": "Beijing",
821                }),
822                serde_json::json!({
823                    "name": "Bob",
824                    "age": 21,
825                    "is_student": false,
826                    "score": 88.5,
827                    "hobbies": "swimming",
828                    "address": "Shanghai",
829                    "gaga": "gaga"
830                }),
831            ];
832            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
833            assert!(rows.is_ok());
834            let mut rows = rows.unwrap();
835            assert!(rows.len() == 1);
836            let rows = rows.remove(&ContextOpt::default()).unwrap();
837            assert_eq!(rows.schema.len(), 8);
838            assert_eq!(rows.rows.len(), 2);
839            assert_eq!(8, rows.rows[0].values.len());
840            assert_eq!(8, rows.rows[1].values.len());
841        }
842        {
843            let array = vec![
844                serde_json::json!({
845                    "woshinull": null,
846                    "name": "Alice",
847                    "age": 20,
848                    "is_student": true,
849                    "score": 99.5,
850                    "hobbies": "reading",
851                    "address": "Beijing",
852                }),
853                serde_json::json!({
854                    "name": "Bob",
855                    "age": 21,
856                    "is_student": false,
857                    "score": 88.5,
858                    "hobbies": "swimming",
859                    "address": "Shanghai",
860                    "gaga": "gaga"
861                }),
862            ];
863            let tag_column_names = ["name".to_string(), "address".to_string()];
864
865            let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
866                .map(|(mut schema, mut rows)| {
867                    for name in tag_column_names {
868                        if let Some(index) = schema.index.get(&name) {
869                            schema.schema[*index].semantic_type = SemanticType::Tag as i32;
870                        }
871                    }
872
873                    assert!(rows.len() == 1);
874                    let rows = rows.remove(&ContextOpt::default()).unwrap();
875
876                    Rows {
877                        schema: schema.schema,
878                        rows,
879                    }
880                });
881
882            assert!(rows.is_ok());
883            let rows = rows.unwrap();
884            assert_eq!(rows.schema.len(), 8);
885            assert_eq!(rows.rows.len(), 2);
886            assert_eq!(8, rows.rows[0].values.len());
887            assert_eq!(8, rows.rows[1].values.len());
888            assert_eq!(
889                rows.schema
890                    .iter()
891                    .find(|x| x.column_name == "name")
892                    .unwrap()
893                    .semantic_type,
894                SemanticType::Tag as i32
895            );
896            assert_eq!(
897                rows.schema
898                    .iter()
899                    .find(|x| x.column_name == "address")
900                    .unwrap()
901                    .semantic_type,
902                SemanticType::Tag as i32
903            );
904            assert_eq!(
905                rows.schema
906                    .iter()
907                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
908                    .count(),
909                2
910            );
911        }
912    }
913
914    #[test]
915    fn test_flatten() {
916        let test_cases = vec![
917            // Basic case.
918            (
919                serde_json::json!(
920                    {
921                        "a": {
922                            "b": {
923                                "c": [1, 2, 3]
924                            }
925                        },
926                        "d": [
927                            "foo",
928                            "bar"
929                        ],
930                        "e": {
931                            "f": [7, 8, 9],
932                            "g": {
933                                "h": 123,
934                                "i": "hello",
935                                "j": {
936                                    "k": true
937                                }
938                            }
939                        }
940                    }
941                ),
942                10,
943                Some(serde_json::json!(
944                    {
945                        "a.b.c": [1,2,3],
946                        "d": ["foo","bar"],
947                        "e.f": [7,8,9],
948                        "e.g.h": 123,
949                        "e.g.i": "hello",
950                        "e.g.j.k": true
951                    }
952                )),
953            ),
954            // Test the case where the object has more than 3 nested levels.
955            (
956                serde_json::json!(
957                    {
958                        "a": {
959                            "b": {
960                                "c": {
961                                    "d": [1, 2, 3]
962                                }
963                            }
964                        },
965                        "e": [
966                            "foo",
967                            "bar"
968                        ]
969                    }
970                ),
971                3,
972                None,
973            ),
974        ];
975
976        for (input, max_depth, expected) in test_cases {
977            let input = json_to_map(input).unwrap();
978            let expected = expected.map(|e| json_to_map(e).unwrap());
979
980            let flattened_object = flatten_object(input, max_depth).ok();
981            assert_eq!(flattened_object, expected);
982        }
983    }
984
985    #[test]
986    fn test_greptime_pipeline_params() {
987        let params = Some("flatten_json_object=true");
988        let pipeline_params = GreptimePipelineParams::from_params(params);
989        assert!(pipeline_params.flatten_json_object());
990
991        let params = None;
992        let pipeline_params = GreptimePipelineParams::from_params(params);
993        assert!(!pipeline_params.flatten_json_object());
994    }
995}