pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
27use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
28use itertools::Itertools;
29use once_cell::sync::OnceCell;
30use serde_json::Number;
31use session::context::Channel;
32
33use crate::error::{
34    IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
35    TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
36    TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
37};
38use crate::etl::ctx_req::ContextOpt;
39use crate::etl::field::{Field, Fields};
40use crate::etl::transform::index::Index;
41use crate::etl::transform::{Transform, Transforms};
42use crate::etl::value::{Timestamp, Value};
43use crate::etl::PipelineMap;
44use crate::PipelineContext;
45
46const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
47const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
48
49/// fields not in the columns will be discarded
50/// to prevent automatic column creation in GreptimeDB
51#[derive(Debug, Clone)]
52pub struct GreptimeTransformer {
53    transforms: Transforms,
54    schema: Vec<ColumnSchema>,
55}
56
57/// Parameters that can be used to configure the greptime pipelines.
58#[derive(Debug, Clone, Default)]
59pub struct GreptimePipelineParams {
60    /// The original options for configuring the greptime pipelines.
61    /// This should not be used directly, instead, use the parsed shortcut option values.
62    options: HashMap<String, String>,
63
64    /// Parsed shortcut option values
65    pub flatten_json_object: OnceCell<bool>,
66}
67
68impl GreptimePipelineParams {
69    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
70    /// The params is in the format of `key1=value1&key2=value2`,for example:
71    /// x-greptime-pipeline-params: flatten_json_object=true
72    pub fn from_params(params: Option<&str>) -> Self {
73        let options = params
74            .unwrap_or_default()
75            .split('&')
76            .filter_map(|s| s.split_once('='))
77            .map(|(k, v)| (k.to_string(), v.to_string()))
78            .collect::<HashMap<String, String>>();
79
80        Self {
81            options,
82            flatten_json_object: OnceCell::new(),
83        }
84    }
85
86    /// Whether to flatten the JSON object.
87    pub fn flatten_json_object(&self) -> bool {
88        *self.flatten_json_object.get_or_init(|| {
89            self.options
90                .get("flatten_json_object")
91                .map(|v| v == "true")
92                .unwrap_or(false)
93        })
94    }
95}
96
97impl GreptimeTransformer {
98    /// Add a default timestamp column to the transforms
99    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
100        let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
101        let default = None;
102
103        let transform = Transform {
104            fields: Fields::one(Field::new(
105                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
106                None,
107            )),
108            type_,
109            default,
110            index: Some(Index::Time),
111            on_failure: Some(crate::etl::transform::OnFailure::Default),
112            tag: false,
113        };
114        transforms.push(transform);
115    }
116
117    /// Generate the schema for the GreptimeTransformer
118    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
119        let mut schema = vec![];
120        for transform in transforms.iter() {
121            schema.extend(coerce_columns(transform)?);
122        }
123        Ok(schema)
124    }
125}
126
127impl GreptimeTransformer {
128    pub fn new(mut transforms: Transforms) -> Result<Self> {
129        // empty check is done in the caller
130        let mut column_names_set = HashSet::new();
131        let mut timestamp_columns = vec![];
132
133        for transform in transforms.iter() {
134            let target_fields_set = transform
135                .fields
136                .iter()
137                .map(|f| f.target_or_input_field())
138                .collect::<HashSet<_>>();
139
140            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
141            if !intersections.is_empty() {
142                let duplicates = intersections.iter().join(",");
143                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
144            }
145
146            column_names_set.extend(target_fields_set);
147
148            if let Some(idx) = transform.index {
149                if idx == Index::Time {
150                    match transform.fields.len() {
151                        //Safety unwrap is fine here because we have checked the length of real_fields
152                        1 => {
153                            timestamp_columns.push(transform.fields.first().unwrap().input_field())
154                        }
155                        _ => {
156                            return TransformMultipleTimestampIndexSnafu {
157                                columns: transform
158                                    .fields
159                                    .iter()
160                                    .map(|x| x.input_field())
161                                    .join(", "),
162                            }
163                            .fail();
164                        }
165                    }
166                }
167            }
168        }
169
170        match timestamp_columns.len() {
171            0 => {
172                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
173
174                let schema = GreptimeTransformer::init_schemas(&transforms)?;
175                Ok(GreptimeTransformer { transforms, schema })
176            }
177            1 => {
178                let schema = GreptimeTransformer::init_schemas(&transforms)?;
179                Ok(GreptimeTransformer { transforms, schema })
180            }
181            _ => {
182                let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
183                let count = timestamp_columns.len();
184                TransformTimestampIndexCountSnafu { count, columns }.fail()
185            }
186        }
187    }
188
189    pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
190        let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
191
192        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
193        let mut output_index = 0;
194        for transform in self.transforms.iter() {
195            for field in transform.fields.iter() {
196                let index = field.input_field();
197                match pipeline_map.get(index) {
198                    Some(v) => {
199                        let value_data = coerce_value(v, transform)?;
200                        // every transform fields has only one output field
201                        values[output_index] = GreptimeValue { value_data };
202                    }
203                    None => {
204                        let value_data = match transform.on_failure {
205                            Some(crate::etl::transform::OnFailure::Default) => {
206                                match transform.get_default() {
207                                    Some(default) => coerce_value(default, transform)?,
208                                    None => match transform.get_default_value_when_data_is_none() {
209                                        Some(default) => coerce_value(&default, transform)?,
210                                        None => None,
211                                    },
212                                }
213                            }
214                            Some(crate::etl::transform::OnFailure::Ignore) => None,
215                            None => None,
216                        };
217                        values[output_index] = GreptimeValue { value_data };
218                    }
219                }
220                output_index += 1;
221            }
222        }
223        Ok((opt, Row { values }))
224    }
225
226    pub fn transforms(&self) -> &Transforms {
227        &self.transforms
228    }
229
230    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
231        &self.schema
232    }
233
234    pub fn transforms_mut(&mut self) -> &mut Transforms {
235        &mut self.transforms
236    }
237}
238
239/// This is used to record the current state schema information and a sequential cache of field names.
240/// As you traverse the user input JSON, this will change.
241/// It will record a superset of all user input schemas.
242#[derive(Debug, Default)]
243pub struct SchemaInfo {
244    /// schema info
245    pub schema: Vec<ColumnSchema>,
246    /// index of the column name
247    pub index: HashMap<String, usize>,
248}
249
250impl SchemaInfo {
251    pub fn with_capacity(capacity: usize) -> Self {
252        Self {
253            schema: Vec::with_capacity(capacity),
254            index: HashMap::with_capacity(capacity),
255        }
256    }
257}
258
259fn resolve_schema(
260    index: Option<usize>,
261    value_data: ValueData,
262    column_schema: ColumnSchema,
263    row: &mut Vec<GreptimeValue>,
264    schema_info: &mut SchemaInfo,
265) -> Result<()> {
266    if let Some(index) = index {
267        let api_value = GreptimeValue {
268            value_data: Some(value_data),
269        };
270        // Safety unwrap is fine here because api_value is always valid
271        let value_column_data_type = proto_value_type(&api_value).unwrap();
272        // Safety unwrap is fine here because index is always valid
273        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
274        if value_column_data_type != schema_column_data_type {
275            IdentifyPipelineColumnTypeMismatchSnafu {
276                column: column_schema.column_name,
277                expected: schema_column_data_type.as_str_name(),
278                actual: value_column_data_type.as_str_name(),
279            }
280            .fail()
281        } else {
282            row[index] = api_value;
283            Ok(())
284        }
285    } else {
286        let key = column_schema.column_name.clone();
287        schema_info.schema.push(column_schema);
288        schema_info.index.insert(key, schema_info.schema.len() - 1);
289        let api_value = GreptimeValue {
290            value_data: Some(value_data),
291        };
292        row.push(api_value);
293        Ok(())
294    }
295}
296
297fn resolve_number_schema(
298    n: Number,
299    column_name: String,
300    index: Option<usize>,
301    row: &mut Vec<GreptimeValue>,
302    schema_info: &mut SchemaInfo,
303) -> Result<()> {
304    let (value, datatype, semantic_type) = if n.is_i64() {
305        (
306            ValueData::I64Value(n.as_i64().unwrap()),
307            ColumnDataType::Int64 as i32,
308            SemanticType::Field as i32,
309        )
310    } else if n.is_u64() {
311        (
312            ValueData::U64Value(n.as_u64().unwrap()),
313            ColumnDataType::Uint64 as i32,
314            SemanticType::Field as i32,
315        )
316    } else if n.is_f64() {
317        (
318            ValueData::F64Value(n.as_f64().unwrap()),
319            ColumnDataType::Float64 as i32,
320            SemanticType::Field as i32,
321        )
322    } else {
323        return UnsupportedNumberTypeSnafu { value: n }.fail();
324    };
325    resolve_schema(
326        index,
327        value,
328        ColumnSchema {
329            column_name,
330            datatype,
331            semantic_type,
332            datatype_extension: None,
333            options: None,
334        },
335        row,
336        schema_info,
337    )
338}
339
340fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<ValueData>> {
341    match p_ctx.channel {
342        Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
343            values
344                .get(GREPTIME_TIMESTAMP)
345                .and_then(|v| v.as_i64())
346                .unwrap_or_default(),
347        ))),
348        _ => {
349            let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
350            match custom_ts {
351                Some(ts) => {
352                    let ts_field = values.get(ts.get_column_name());
353                    Some(ts.get_timestamp(ts_field)).transpose()
354                }
355                None => Ok(Some(ValueData::TimestampNanosecondValue(
356                    chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
357                ))),
358            }
359        }
360    }
361}
362
363fn values_to_row(
364    schema_info: &mut SchemaInfo,
365    values: PipelineMap,
366    pipeline_ctx: &PipelineContext<'_>,
367) -> Result<Row> {
368    let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
369    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
370
371    // calculate timestamp value based on the channel
372    let ts = calc_ts(pipeline_ctx, &values)?;
373
374    row.push(GreptimeValue { value_data: ts });
375
376    for _ in 1..schema_info.schema.len() {
377        row.push(GreptimeValue { value_data: None });
378    }
379
380    // skip ts column
381    let ts_column_name = custom_ts
382        .as_ref()
383        .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
384
385    for (column_name, value) in values {
386        if column_name == ts_column_name {
387            continue;
388        }
389
390        resolve_value(value, column_name, &mut row, schema_info, pipeline_ctx)?;
391    }
392    Ok(Row { values: row })
393}
394
395fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
396    if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
397        SemanticType::Tag as i32
398    } else {
399        SemanticType::Field as i32
400    }
401}
402
403fn resolve_value(
404    value: Value,
405    column_name: String,
406    row: &mut Vec<GreptimeValue>,
407    schema_info: &mut SchemaInfo,
408    p_ctx: &PipelineContext,
409) -> Result<()> {
410    let index = schema_info.index.get(&column_name).copied();
411    let mut resolve_simple_type =
412        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
413            let semantic_type = decide_semantic(p_ctx, &column_name);
414            resolve_schema(
415                index,
416                value_data,
417                ColumnSchema {
418                    column_name,
419                    datatype: data_type as i32,
420                    semantic_type,
421                    datatype_extension: None,
422                    options: None,
423                },
424                row,
425                schema_info,
426            )
427        };
428
429    match value {
430        Value::Null => {}
431
432        Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
433            // safe unwrap after type matched
434            let v = value.as_i64().unwrap();
435            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
436        }
437
438        Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
439            // safe unwrap after type matched
440            let v = value.as_u64().unwrap();
441            resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
442        }
443
444        Value::Float32(_) | Value::Float64(_) => {
445            // safe unwrap after type matched
446            let v = value.as_f64().unwrap();
447            resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
448        }
449
450        Value::Boolean(v) => {
451            resolve_simple_type(
452                ValueData::BoolValue(v),
453                column_name,
454                ColumnDataType::Boolean,
455            )?;
456        }
457
458        Value::String(v) => {
459            resolve_simple_type(
460                ValueData::StringValue(v),
461                column_name,
462                ColumnDataType::String,
463            )?;
464        }
465
466        Value::Timestamp(Timestamp::Nanosecond(ns)) => {
467            resolve_simple_type(
468                ValueData::TimestampNanosecondValue(ns),
469                column_name,
470                ColumnDataType::TimestampNanosecond,
471            )?;
472        }
473
474        Value::Timestamp(Timestamp::Microsecond(us)) => {
475            resolve_simple_type(
476                ValueData::TimestampMicrosecondValue(us),
477                column_name,
478                ColumnDataType::TimestampMicrosecond,
479            )?;
480        }
481
482        Value::Timestamp(Timestamp::Millisecond(ms)) => {
483            resolve_simple_type(
484                ValueData::TimestampMillisecondValue(ms),
485                column_name,
486                ColumnDataType::TimestampMillisecond,
487            )?;
488        }
489
490        Value::Timestamp(Timestamp::Second(s)) => {
491            resolve_simple_type(
492                ValueData::TimestampSecondValue(s),
493                column_name,
494                ColumnDataType::TimestampSecond,
495            )?;
496        }
497
498        Value::Array(_) | Value::Map(_) => {
499            let data: jsonb::Value = value.into();
500            resolve_schema(
501                index,
502                ValueData::BinaryValue(data.to_vec()),
503                ColumnSchema {
504                    column_name,
505                    datatype: ColumnDataType::Binary as i32,
506                    semantic_type: SemanticType::Field as i32,
507                    datatype_extension: Some(ColumnDataTypeExtension {
508                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
509                    }),
510                    options: None,
511                },
512                row,
513                schema_info,
514            )?;
515        }
516    }
517    Ok(())
518}
519
520fn identity_pipeline_inner(
521    pipeline_maps: Vec<PipelineMap>,
522    pipeline_ctx: &PipelineContext<'_>,
523) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
524    let mut schema_info = SchemaInfo::default();
525    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
526
527    // set time index column schema first
528    schema_info.schema.push(ColumnSchema {
529        column_name: custom_ts
530            .map(|ts| ts.get_column_name().clone())
531            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
532        datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
533            if pipeline_ctx.channel == Channel::Prometheus {
534                ColumnDataType::TimestampMillisecond
535            } else {
536                ColumnDataType::TimestampNanosecond
537            }
538        }) as i32,
539        semantic_type: SemanticType::Timestamp as i32,
540        datatype_extension: None,
541        options: None,
542    });
543
544    let mut opt_map = HashMap::new();
545    let len = pipeline_maps.len();
546
547    for mut pipeline_map in pipeline_maps {
548        let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
549        let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
550
551        opt_map
552            .entry(opt)
553            .or_insert_with(|| Vec::with_capacity(len))
554            .push(row);
555    }
556
557    let column_count = schema_info.schema.len();
558    for (_, row) in opt_map.iter_mut() {
559        for row in row.iter_mut() {
560            let diff = column_count - row.values.len();
561            for _ in 0..diff {
562                row.values.push(GreptimeValue { value_data: None });
563            }
564        }
565    }
566
567    Ok((schema_info, opt_map))
568}
569
570/// Identity pipeline for Greptime
571/// This pipeline will convert the input JSON array to Greptime Rows
572/// params table is used to set the semantic type of the row key column to Tag
573/// 1. The pipeline will add a default timestamp column to the schema
574/// 2. The pipeline not resolve NULL value
575/// 3. The pipeline assumes that the json format is fixed
576/// 4. The pipeline will return an error if the same column datatype is mismatched
577/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
578pub fn identity_pipeline(
579    array: Vec<PipelineMap>,
580    table: Option<Arc<table::Table>>,
581    pipeline_ctx: &PipelineContext<'_>,
582) -> Result<HashMap<ContextOpt, Rows>> {
583    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
584        array
585            .into_iter()
586            .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
587            .collect::<Result<Vec<PipelineMap>>>()?
588    } else {
589        array
590    };
591
592    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
593        if let Some(table) = table {
594            let table_info = table.table_info();
595            for tag_name in table_info.meta.row_key_column_names() {
596                if let Some(index) = schema.index.get(tag_name) {
597                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
598                }
599            }
600        }
601
602        opt_map
603            .into_iter()
604            .map(|(opt, rows)| {
605                (
606                    opt,
607                    Rows {
608                        schema: schema.schema.clone(),
609                        rows,
610                    },
611                )
612            })
613            .collect::<HashMap<ContextOpt, Rows>>()
614    })
615}
616
617/// Consumes the JSON object and consumes it into a single-level object.
618///
619/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
620/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
621pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
622    let mut flattened = PipelineMap::new();
623
624    if !object.is_empty() {
625        // it will use recursion to flatten the object.
626        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
627    }
628
629    Ok(flattened)
630}
631
632fn do_flatten_object(
633    dest: &mut PipelineMap,
634    base: Option<&str>,
635    object: PipelineMap,
636    current_level: usize,
637    max_nested_levels: usize,
638) -> Result<()> {
639    // For safety, we do not allow the depth to be greater than the max_object_depth.
640    if current_level > max_nested_levels {
641        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
642    }
643
644    for (key, value) in object {
645        let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
646
647        match value {
648            Value::Map(object) => {
649                do_flatten_object(
650                    dest,
651                    Some(&new_key),
652                    object.values,
653                    current_level + 1,
654                    max_nested_levels,
655                )?;
656            }
657            // For other types, we will directly insert them into as JSON type.
658            _ => {
659                dest.insert(new_key, value);
660            }
661        }
662    }
663
664    Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669    use api::v1::SemanticType;
670
671    use super::*;
672    use crate::etl::{json_array_to_map, json_to_map};
673    use crate::{identity_pipeline, PipelineDefinition};
674
675    #[test]
676    fn test_identify_pipeline() {
677        let params = GreptimePipelineParams::default();
678        let pipeline_ctx = PipelineContext::new(
679            &PipelineDefinition::GreptimeIdentityPipeline(None),
680            &params,
681            Channel::Unknown,
682        );
683        {
684            let array = vec![
685                serde_json::json!({
686                    "woshinull": null,
687                    "name": "Alice",
688                    "age": 20,
689                    "is_student": true,
690                    "score": 99.5,
691                    "hobbies": "reading",
692                    "address": "Beijing",
693                }),
694                serde_json::json!({
695                    "name": "Bob",
696                    "age": 21,
697                    "is_student": false,
698                    "score": "88.5",
699                    "hobbies": "swimming",
700                    "address": "Shanghai",
701                    "gaga": "gaga"
702                }),
703            ];
704            let array = json_array_to_map(array).unwrap();
705            let rows = identity_pipeline(array, None, &pipeline_ctx);
706            assert!(rows.is_err());
707            assert_eq!(
708                rows.err().unwrap().to_string(),
709                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
710            );
711        }
712        {
713            let array = vec![
714                serde_json::json!({
715                    "woshinull": null,
716                    "name": "Alice",
717                    "age": 20,
718                    "is_student": true,
719                    "score": 99.5,
720                    "hobbies": "reading",
721                    "address": "Beijing",
722                }),
723                serde_json::json!({
724                    "name": "Bob",
725                    "age": 21,
726                    "is_student": false,
727                    "score": 88,
728                    "hobbies": "swimming",
729                    "address": "Shanghai",
730                    "gaga": "gaga"
731                }),
732            ];
733            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
734            assert!(rows.is_err());
735            assert_eq!(
736                rows.err().unwrap().to_string(),
737                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
738            );
739        }
740        {
741            let array = vec![
742                serde_json::json!({
743                    "woshinull": null,
744                    "name": "Alice",
745                    "age": 20,
746                    "is_student": true,
747                    "score": 99.5,
748                    "hobbies": "reading",
749                    "address": "Beijing",
750                }),
751                serde_json::json!({
752                    "name": "Bob",
753                    "age": 21,
754                    "is_student": false,
755                    "score": 88.5,
756                    "hobbies": "swimming",
757                    "address": "Shanghai",
758                    "gaga": "gaga"
759                }),
760            ];
761            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
762            assert!(rows.is_ok());
763            let mut rows = rows.unwrap();
764            assert!(rows.len() == 1);
765            let rows = rows.remove(&ContextOpt::default()).unwrap();
766            assert_eq!(rows.schema.len(), 8);
767            assert_eq!(rows.rows.len(), 2);
768            assert_eq!(8, rows.rows[0].values.len());
769            assert_eq!(8, rows.rows[1].values.len());
770        }
771        {
772            let array = vec![
773                serde_json::json!({
774                    "woshinull": null,
775                    "name": "Alice",
776                    "age": 20,
777                    "is_student": true,
778                    "score": 99.5,
779                    "hobbies": "reading",
780                    "address": "Beijing",
781                }),
782                serde_json::json!({
783                    "name": "Bob",
784                    "age": 21,
785                    "is_student": false,
786                    "score": 88.5,
787                    "hobbies": "swimming",
788                    "address": "Shanghai",
789                    "gaga": "gaga"
790                }),
791            ];
792            let tag_column_names = ["name".to_string(), "address".to_string()];
793
794            let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
795                .map(|(mut schema, mut rows)| {
796                    for name in tag_column_names {
797                        if let Some(index) = schema.index.get(&name) {
798                            schema.schema[*index].semantic_type = SemanticType::Tag as i32;
799                        }
800                    }
801
802                    assert!(rows.len() == 1);
803                    let rows = rows.remove(&ContextOpt::default()).unwrap();
804
805                    Rows {
806                        schema: schema.schema,
807                        rows,
808                    }
809                });
810
811            assert!(rows.is_ok());
812            let rows = rows.unwrap();
813            assert_eq!(rows.schema.len(), 8);
814            assert_eq!(rows.rows.len(), 2);
815            assert_eq!(8, rows.rows[0].values.len());
816            assert_eq!(8, rows.rows[1].values.len());
817            assert_eq!(
818                rows.schema
819                    .iter()
820                    .find(|x| x.column_name == "name")
821                    .unwrap()
822                    .semantic_type,
823                SemanticType::Tag as i32
824            );
825            assert_eq!(
826                rows.schema
827                    .iter()
828                    .find(|x| x.column_name == "address")
829                    .unwrap()
830                    .semantic_type,
831                SemanticType::Tag as i32
832            );
833            assert_eq!(
834                rows.schema
835                    .iter()
836                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
837                    .count(),
838                2
839            );
840        }
841    }
842
843    #[test]
844    fn test_flatten() {
845        let test_cases = vec![
846            // Basic case.
847            (
848                serde_json::json!(
849                    {
850                        "a": {
851                            "b": {
852                                "c": [1, 2, 3]
853                            }
854                        },
855                        "d": [
856                            "foo",
857                            "bar"
858                        ],
859                        "e": {
860                            "f": [7, 8, 9],
861                            "g": {
862                                "h": 123,
863                                "i": "hello",
864                                "j": {
865                                    "k": true
866                                }
867                            }
868                        }
869                    }
870                ),
871                10,
872                Some(serde_json::json!(
873                    {
874                        "a.b.c": [1,2,3],
875                        "d": ["foo","bar"],
876                        "e.f": [7,8,9],
877                        "e.g.h": 123,
878                        "e.g.i": "hello",
879                        "e.g.j.k": true
880                    }
881                )),
882            ),
883            // Test the case where the object has more than 3 nested levels.
884            (
885                serde_json::json!(
886                    {
887                        "a": {
888                            "b": {
889                                "c": {
890                                    "d": [1, 2, 3]
891                                }
892                            }
893                        },
894                        "e": [
895                            "foo",
896                            "bar"
897                        ]
898                    }
899                ),
900                3,
901                None,
902            ),
903        ];
904
905        for (input, max_depth, expected) in test_cases {
906            let input = json_to_map(input).unwrap();
907            let expected = expected.map(|e| json_to_map(e).unwrap());
908
909            let flattened_object = flatten_object(input, max_depth).ok();
910            assert_eq!(flattened_object, expected);
911        }
912    }
913
914    #[test]
915    fn test_greptime_pipeline_params() {
916        let params = Some("flatten_json_object=true");
917        let pipeline_params = GreptimePipelineParams::from_params(params);
918        assert!(pipeline_params.flatten_json_object());
919
920        let params = None;
921        let pipeline_params = GreptimePipelineParams::from_params(params);
922        assert!(!pipeline_params.flatten_json_object());
923    }
924}