pipeline/etl/transform/transformer/
greptime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod coerce;
16
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
27use itertools::Itertools;
28use once_cell::sync::OnceCell;
29use serde_json::Number;
30
31use crate::error::{
32    IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
33    TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
34    TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
35};
36use crate::etl::field::{Field, Fields};
37use crate::etl::transform::index::Index;
38use crate::etl::transform::{Transform, Transforms};
39use crate::etl::value::{Timestamp, Value};
40use crate::etl::PipelineMap;
41use crate::{IdentityTimeIndex, PipelineContext};
42
43const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
44const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
45
46/// fields not in the columns will be discarded
47/// to prevent automatic column creation in GreptimeDB
48#[derive(Debug, Clone)]
49pub struct GreptimeTransformer {
50    transforms: Transforms,
51    schema: Vec<ColumnSchema>,
52}
53
54/// Parameters that can be used to configure the greptime pipelines.
55#[derive(Debug, Clone, Default)]
56pub struct GreptimePipelineParams {
57    /// The original options for configuring the greptime pipelines.
58    /// This should not be used directly, instead, use the parsed shortcut option values.
59    options: HashMap<String, String>,
60
61    /// Parsed shortcut option values
62    pub flatten_json_object: OnceCell<bool>,
63}
64
65impl GreptimePipelineParams {
66    /// Create a `GreptimePipelineParams` from params string which is from the http header with key `x-greptime-pipeline-params`
67    /// The params is in the format of `key1=value1&key2=value2`,for example:
68    /// x-greptime-pipeline-params: flatten_json_object=true
69    pub fn from_params(params: Option<&str>) -> Self {
70        let options = params
71            .unwrap_or_default()
72            .split('&')
73            .filter_map(|s| s.split_once('='))
74            .map(|(k, v)| (k.to_string(), v.to_string()))
75            .collect::<HashMap<String, String>>();
76
77        Self {
78            options,
79            flatten_json_object: OnceCell::new(),
80        }
81    }
82
83    /// Whether to flatten the JSON object.
84    pub fn flatten_json_object(&self) -> bool {
85        *self.flatten_json_object.get_or_init(|| {
86            self.options
87                .get("flatten_json_object")
88                .map(|v| v == "true")
89                .unwrap_or(false)
90        })
91    }
92}
93
94impl GreptimeTransformer {
95    /// Add a default timestamp column to the transforms
96    fn add_greptime_timestamp_column(transforms: &mut Transforms) {
97        let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
98        let default = None;
99
100        let transform = Transform {
101            fields: Fields::one(Field::new(
102                DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
103                None,
104            )),
105            type_,
106            default,
107            index: Some(Index::Time),
108            on_failure: Some(crate::etl::transform::OnFailure::Default),
109            tag: false,
110        };
111        transforms.push(transform);
112    }
113
114    /// Generate the schema for the GreptimeTransformer
115    fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
116        let mut schema = vec![];
117        for transform in transforms.iter() {
118            schema.extend(coerce_columns(transform)?);
119        }
120        Ok(schema)
121    }
122}
123
124impl GreptimeTransformer {
125    pub fn new(mut transforms: Transforms) -> Result<Self> {
126        // empty check is done in the caller
127        let mut column_names_set = HashSet::new();
128        let mut timestamp_columns = vec![];
129
130        for transform in transforms.iter() {
131            let target_fields_set = transform
132                .fields
133                .iter()
134                .map(|f| f.target_or_input_field())
135                .collect::<HashSet<_>>();
136
137            let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
138            if !intersections.is_empty() {
139                let duplicates = intersections.iter().join(",");
140                return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
141            }
142
143            column_names_set.extend(target_fields_set);
144
145            if let Some(idx) = transform.index {
146                if idx == Index::Time {
147                    match transform.fields.len() {
148                        //Safety unwrap is fine here because we have checked the length of real_fields
149                        1 => {
150                            timestamp_columns.push(transform.fields.first().unwrap().input_field())
151                        }
152                        _ => {
153                            return TransformMultipleTimestampIndexSnafu {
154                                columns: transform
155                                    .fields
156                                    .iter()
157                                    .map(|x| x.input_field())
158                                    .join(", "),
159                            }
160                            .fail();
161                        }
162                    }
163                }
164            }
165        }
166
167        match timestamp_columns.len() {
168            0 => {
169                GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
170
171                let schema = GreptimeTransformer::init_schemas(&transforms)?;
172                Ok(GreptimeTransformer { transforms, schema })
173            }
174            1 => {
175                let schema = GreptimeTransformer::init_schemas(&transforms)?;
176                Ok(GreptimeTransformer { transforms, schema })
177            }
178            _ => {
179                let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
180                let count = timestamp_columns.len();
181                TransformTimestampIndexCountSnafu { count, columns }.fail()
182            }
183        }
184    }
185
186    pub fn transform_mut(&self, val: &mut PipelineMap) -> Result<Row> {
187        let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
188        let mut output_index = 0;
189        for transform in self.transforms.iter() {
190            for field in transform.fields.iter() {
191                let index = field.input_field();
192                match val.get(index) {
193                    Some(v) => {
194                        let value_data = coerce_value(v, transform)?;
195                        // every transform fields has only one output field
196                        values[output_index] = GreptimeValue { value_data };
197                    }
198                    None => {
199                        let value_data = match transform.on_failure {
200                            Some(crate::etl::transform::OnFailure::Default) => {
201                                match transform.get_default() {
202                                    Some(default) => coerce_value(default, transform)?,
203                                    None => match transform.get_default_value_when_data_is_none() {
204                                        Some(default) => coerce_value(&default, transform)?,
205                                        None => None,
206                                    },
207                                }
208                            }
209                            Some(crate::etl::transform::OnFailure::Ignore) => None,
210                            None => None,
211                        };
212                        values[output_index] = GreptimeValue { value_data };
213                    }
214                }
215                output_index += 1;
216            }
217        }
218        Ok(Row { values })
219    }
220
221    pub fn transforms(&self) -> &Transforms {
222        &self.transforms
223    }
224
225    pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
226        &self.schema
227    }
228
229    pub fn transforms_mut(&mut self) -> &mut Transforms {
230        &mut self.transforms
231    }
232}
233
234/// This is used to record the current state schema information and a sequential cache of field names.
235/// As you traverse the user input JSON, this will change.
236/// It will record a superset of all user input schemas.
237#[derive(Debug, Default)]
238pub struct SchemaInfo {
239    /// schema info
240    pub schema: Vec<ColumnSchema>,
241    /// index of the column name
242    pub index: HashMap<String, usize>,
243}
244
245impl SchemaInfo {
246    pub fn with_capacity(capacity: usize) -> Self {
247        Self {
248            schema: Vec::with_capacity(capacity),
249            index: HashMap::with_capacity(capacity),
250        }
251    }
252}
253
254fn resolve_schema(
255    index: Option<usize>,
256    value_data: ValueData,
257    column_schema: ColumnSchema,
258    row: &mut Vec<GreptimeValue>,
259    schema_info: &mut SchemaInfo,
260) -> Result<()> {
261    if let Some(index) = index {
262        let api_value = GreptimeValue {
263            value_data: Some(value_data),
264        };
265        // Safety unwrap is fine here because api_value is always valid
266        let value_column_data_type = proto_value_type(&api_value).unwrap();
267        // Safety unwrap is fine here because index is always valid
268        let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
269        if value_column_data_type != schema_column_data_type {
270            IdentifyPipelineColumnTypeMismatchSnafu {
271                column: column_schema.column_name,
272                expected: schema_column_data_type.as_str_name(),
273                actual: value_column_data_type.as_str_name(),
274            }
275            .fail()
276        } else {
277            row[index] = api_value;
278            Ok(())
279        }
280    } else {
281        let key = column_schema.column_name.clone();
282        schema_info.schema.push(column_schema);
283        schema_info.index.insert(key, schema_info.schema.len() - 1);
284        let api_value = GreptimeValue {
285            value_data: Some(value_data),
286        };
287        row.push(api_value);
288        Ok(())
289    }
290}
291
292fn resolve_number_schema(
293    n: Number,
294    column_name: String,
295    index: Option<usize>,
296    row: &mut Vec<GreptimeValue>,
297    schema_info: &mut SchemaInfo,
298) -> Result<()> {
299    let (value, datatype, semantic_type) = if n.is_i64() {
300        (
301            ValueData::I64Value(n.as_i64().unwrap()),
302            ColumnDataType::Int64 as i32,
303            SemanticType::Field as i32,
304        )
305    } else if n.is_u64() {
306        (
307            ValueData::U64Value(n.as_u64().unwrap()),
308            ColumnDataType::Uint64 as i32,
309            SemanticType::Field as i32,
310        )
311    } else if n.is_f64() {
312        (
313            ValueData::F64Value(n.as_f64().unwrap()),
314            ColumnDataType::Float64 as i32,
315            SemanticType::Field as i32,
316        )
317    } else {
318        return UnsupportedNumberTypeSnafu { value: n }.fail();
319    };
320    resolve_schema(
321        index,
322        value,
323        ColumnSchema {
324            column_name,
325            datatype,
326            semantic_type,
327            datatype_extension: None,
328            options: None,
329        },
330        row,
331        schema_info,
332    )
333}
334
335fn values_to_row(
336    schema_info: &mut SchemaInfo,
337    values: PipelineMap,
338    custom_ts: Option<&IdentityTimeIndex>,
339) -> Result<Row> {
340    let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
341
342    // set time index value
343    let value_data = match custom_ts {
344        Some(ts) => {
345            let ts_field = values.get(ts.get_column_name());
346            Some(ts.get_timestamp(ts_field)?)
347        }
348        None => Some(ValueData::TimestampNanosecondValue(
349            chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
350        )),
351    };
352
353    row.push(GreptimeValue { value_data });
354
355    for _ in 1..schema_info.schema.len() {
356        row.push(GreptimeValue { value_data: None });
357    }
358
359    for (column_name, value) in values {
360        // skip ts column
361        let ts_column = custom_ts
362            .as_ref()
363            .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
364        if column_name == ts_column {
365            continue;
366        }
367
368        let index = schema_info.index.get(&column_name).copied();
369        resolve_value(index, value, column_name, &mut row, schema_info)?;
370    }
371    Ok(Row { values: row })
372}
373
374fn resolve_value(
375    index: Option<usize>,
376    value: Value,
377    column_name: String,
378    row: &mut Vec<GreptimeValue>,
379    schema_info: &mut SchemaInfo,
380) -> Result<()> {
381    let mut resolve_simple_type =
382        |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
383            resolve_schema(
384                index,
385                value_data,
386                ColumnSchema {
387                    column_name,
388                    datatype: data_type as i32,
389                    semantic_type: SemanticType::Field as i32,
390                    datatype_extension: None,
391                    options: None,
392                },
393                row,
394                schema_info,
395            )
396        };
397
398    match value {
399        Value::Null => {}
400
401        Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
402            // safe unwrap after type matched
403            let v = value.as_i64().unwrap();
404            resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
405        }
406
407        Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
408            // safe unwrap after type matched
409            let v = value.as_u64().unwrap();
410            resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
411        }
412
413        Value::Float32(_) | Value::Float64(_) => {
414            // safe unwrap after type matched
415            let v = value.as_f64().unwrap();
416            resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
417        }
418
419        Value::Boolean(v) => {
420            resolve_simple_type(
421                ValueData::BoolValue(v),
422                column_name,
423                ColumnDataType::Boolean,
424            )?;
425        }
426
427        Value::String(v) => {
428            resolve_simple_type(
429                ValueData::StringValue(v),
430                column_name,
431                ColumnDataType::String,
432            )?;
433        }
434
435        Value::Timestamp(Timestamp::Nanosecond(ns)) => {
436            resolve_simple_type(
437                ValueData::TimestampNanosecondValue(ns),
438                column_name,
439                ColumnDataType::TimestampNanosecond,
440            )?;
441        }
442
443        Value::Timestamp(Timestamp::Microsecond(us)) => {
444            resolve_simple_type(
445                ValueData::TimestampMicrosecondValue(us),
446                column_name,
447                ColumnDataType::TimestampMicrosecond,
448            )?;
449        }
450
451        Value::Timestamp(Timestamp::Millisecond(ms)) => {
452            resolve_simple_type(
453                ValueData::TimestampMillisecondValue(ms),
454                column_name,
455                ColumnDataType::TimestampMillisecond,
456            )?;
457        }
458
459        Value::Timestamp(Timestamp::Second(s)) => {
460            resolve_simple_type(
461                ValueData::TimestampSecondValue(s),
462                column_name,
463                ColumnDataType::TimestampSecond,
464            )?;
465        }
466
467        Value::Array(_) | Value::Map(_) => {
468            let data: jsonb::Value = value.into();
469            resolve_schema(
470                index,
471                ValueData::BinaryValue(data.to_vec()),
472                ColumnSchema {
473                    column_name,
474                    datatype: ColumnDataType::Binary as i32,
475                    semantic_type: SemanticType::Field as i32,
476                    datatype_extension: Some(ColumnDataTypeExtension {
477                        type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
478                    }),
479                    options: None,
480                },
481                row,
482                schema_info,
483            )?;
484        }
485    }
486    Ok(())
487}
488
489fn identity_pipeline_inner(
490    pipeline_maps: Vec<PipelineMap>,
491    pipeline_ctx: &PipelineContext<'_>,
492) -> Result<(SchemaInfo, Vec<Row>)> {
493    let mut rows = Vec::with_capacity(pipeline_maps.len());
494    let mut schema_info = SchemaInfo::default();
495    let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
496
497    // set time index column schema first
498    schema_info.schema.push(ColumnSchema {
499        column_name: custom_ts
500            .map(|ts| ts.get_column_name().clone())
501            .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
502        datatype: custom_ts
503            .map(|c| c.get_datatype())
504            .unwrap_or(ColumnDataType::TimestampNanosecond) as i32,
505        semantic_type: SemanticType::Timestamp as i32,
506        datatype_extension: None,
507        options: None,
508    });
509
510    for values in pipeline_maps {
511        let row = values_to_row(&mut schema_info, values, custom_ts)?;
512        rows.push(row);
513    }
514
515    let column_count = schema_info.schema.len();
516    for row in rows.iter_mut() {
517        let diff = column_count - row.values.len();
518        for _ in 0..diff {
519            row.values.push(GreptimeValue { value_data: None });
520        }
521    }
522
523    Ok((schema_info, rows))
524}
525
526/// Identity pipeline for Greptime
527/// This pipeline will convert the input JSON array to Greptime Rows
528/// params table is used to set the semantic type of the row key column to Tag
529/// 1. The pipeline will add a default timestamp column to the schema
530/// 2. The pipeline not resolve NULL value
531/// 3. The pipeline assumes that the json format is fixed
532/// 4. The pipeline will return an error if the same column datatype is mismatched
533/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
534pub fn identity_pipeline(
535    array: Vec<PipelineMap>,
536    table: Option<Arc<table::Table>>,
537    pipeline_ctx: &PipelineContext<'_>,
538) -> Result<Rows> {
539    let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
540        array
541            .into_iter()
542            .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
543            .collect::<Result<Vec<PipelineMap>>>()?
544    } else {
545        array
546    };
547
548    identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| {
549        if let Some(table) = table {
550            let table_info = table.table_info();
551            for tag_name in table_info.meta.row_key_column_names() {
552                if let Some(index) = schema.index.get(tag_name) {
553                    schema.schema[*index].semantic_type = SemanticType::Tag as i32;
554                }
555            }
556        }
557        Rows {
558            schema: schema.schema,
559            rows,
560        }
561    })
562}
563
564/// Consumes the JSON object and consumes it into a single-level object.
565///
566/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
567/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
568pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
569    let mut flattened = PipelineMap::new();
570
571    if !object.is_empty() {
572        // it will use recursion to flatten the object.
573        do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
574    }
575
576    Ok(flattened)
577}
578
579fn do_flatten_object(
580    dest: &mut PipelineMap,
581    base: Option<&str>,
582    object: PipelineMap,
583    current_level: usize,
584    max_nested_levels: usize,
585) -> Result<()> {
586    // For safety, we do not allow the depth to be greater than the max_object_depth.
587    if current_level > max_nested_levels {
588        return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
589    }
590
591    for (key, value) in object {
592        let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
593
594        match value {
595            Value::Map(object) => {
596                do_flatten_object(
597                    dest,
598                    Some(&new_key),
599                    object.values,
600                    current_level + 1,
601                    max_nested_levels,
602                )?;
603            }
604            // For other types, we will directly insert them into as JSON type.
605            _ => {
606                dest.insert(new_key, value);
607            }
608        }
609    }
610
611    Ok(())
612}
613
614#[cfg(test)]
615mod tests {
616    use api::v1::SemanticType;
617
618    use super::*;
619    use crate::etl::{json_array_to_map, json_to_map};
620    use crate::{identity_pipeline, PipelineDefinition};
621
622    #[test]
623    fn test_identify_pipeline() {
624        let params = GreptimePipelineParams::default();
625        let pipeline_ctx =
626            PipelineContext::new(&PipelineDefinition::GreptimeIdentityPipeline(None), &params);
627        {
628            let array = vec![
629                serde_json::json!({
630                    "woshinull": null,
631                    "name": "Alice",
632                    "age": 20,
633                    "is_student": true,
634                    "score": 99.5,
635                    "hobbies": "reading",
636                    "address": "Beijing",
637                }),
638                serde_json::json!({
639                    "name": "Bob",
640                    "age": 21,
641                    "is_student": false,
642                    "score": "88.5",
643                    "hobbies": "swimming",
644                    "address": "Shanghai",
645                    "gaga": "gaga"
646                }),
647            ];
648            let array = json_array_to_map(array).unwrap();
649            let rows = identity_pipeline(array, None, &pipeline_ctx);
650            assert!(rows.is_err());
651            assert_eq!(
652                rows.err().unwrap().to_string(),
653                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
654            );
655        }
656        {
657            let array = vec![
658                serde_json::json!({
659                    "woshinull": null,
660                    "name": "Alice",
661                    "age": 20,
662                    "is_student": true,
663                    "score": 99.5,
664                    "hobbies": "reading",
665                    "address": "Beijing",
666                }),
667                serde_json::json!({
668                    "name": "Bob",
669                    "age": 21,
670                    "is_student": false,
671                    "score": 88,
672                    "hobbies": "swimming",
673                    "address": "Shanghai",
674                    "gaga": "gaga"
675                }),
676            ];
677            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
678            assert!(rows.is_err());
679            assert_eq!(
680                rows.err().unwrap().to_string(),
681                "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
682            );
683        }
684        {
685            let array = vec![
686                serde_json::json!({
687                    "woshinull": null,
688                    "name": "Alice",
689                    "age": 20,
690                    "is_student": true,
691                    "score": 99.5,
692                    "hobbies": "reading",
693                    "address": "Beijing",
694                }),
695                serde_json::json!({
696                    "name": "Bob",
697                    "age": 21,
698                    "is_student": false,
699                    "score": 88.5,
700                    "hobbies": "swimming",
701                    "address": "Shanghai",
702                    "gaga": "gaga"
703                }),
704            ];
705            let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
706            assert!(rows.is_ok());
707            let rows = rows.unwrap();
708            assert_eq!(rows.schema.len(), 8);
709            assert_eq!(rows.rows.len(), 2);
710            assert_eq!(8, rows.rows[0].values.len());
711            assert_eq!(8, rows.rows[1].values.len());
712        }
713        {
714            let array = vec![
715                serde_json::json!({
716                    "woshinull": null,
717                    "name": "Alice",
718                    "age": 20,
719                    "is_student": true,
720                    "score": 99.5,
721                    "hobbies": "reading",
722                    "address": "Beijing",
723                }),
724                serde_json::json!({
725                    "name": "Bob",
726                    "age": 21,
727                    "is_student": false,
728                    "score": 88.5,
729                    "hobbies": "swimming",
730                    "address": "Shanghai",
731                    "gaga": "gaga"
732                }),
733            ];
734            let tag_column_names = ["name".to_string(), "address".to_string()];
735
736            let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
737                .map(|(mut schema, rows)| {
738                    for name in tag_column_names {
739                        if let Some(index) = schema.index.get(&name) {
740                            schema.schema[*index].semantic_type = SemanticType::Tag as i32;
741                        }
742                    }
743                    Rows {
744                        schema: schema.schema,
745                        rows,
746                    }
747                });
748
749            assert!(rows.is_ok());
750            let rows = rows.unwrap();
751            assert_eq!(rows.schema.len(), 8);
752            assert_eq!(rows.rows.len(), 2);
753            assert_eq!(8, rows.rows[0].values.len());
754            assert_eq!(8, rows.rows[1].values.len());
755            assert_eq!(
756                rows.schema
757                    .iter()
758                    .find(|x| x.column_name == "name")
759                    .unwrap()
760                    .semantic_type,
761                SemanticType::Tag as i32
762            );
763            assert_eq!(
764                rows.schema
765                    .iter()
766                    .find(|x| x.column_name == "address")
767                    .unwrap()
768                    .semantic_type,
769                SemanticType::Tag as i32
770            );
771            assert_eq!(
772                rows.schema
773                    .iter()
774                    .filter(|x| x.semantic_type == SemanticType::Tag as i32)
775                    .count(),
776                2
777            );
778        }
779    }
780
781    #[test]
782    fn test_flatten() {
783        let test_cases = vec![
784            // Basic case.
785            (
786                serde_json::json!(
787                    {
788                        "a": {
789                            "b": {
790                                "c": [1, 2, 3]
791                            }
792                        },
793                        "d": [
794                            "foo",
795                            "bar"
796                        ],
797                        "e": {
798                            "f": [7, 8, 9],
799                            "g": {
800                                "h": 123,
801                                "i": "hello",
802                                "j": {
803                                    "k": true
804                                }
805                            }
806                        }
807                    }
808                ),
809                10,
810                Some(serde_json::json!(
811                    {
812                        "a.b.c": [1,2,3],
813                        "d": ["foo","bar"],
814                        "e.f": [7,8,9],
815                        "e.g.h": 123,
816                        "e.g.i": "hello",
817                        "e.g.j.k": true
818                    }
819                )),
820            ),
821            // Test the case where the object has more than 3 nested levels.
822            (
823                serde_json::json!(
824                    {
825                        "a": {
826                            "b": {
827                                "c": {
828                                    "d": [1, 2, 3]
829                                }
830                            }
831                        },
832                        "e": [
833                            "foo",
834                            "bar"
835                        ]
836                    }
837                ),
838                3,
839                None,
840            ),
841        ];
842
843        for (input, max_depth, expected) in test_cases {
844            let input = json_to_map(input).unwrap();
845            let expected = expected.map(|e| json_to_map(e).unwrap());
846
847            let flattened_object = flatten_object(input, max_depth).ok();
848            assert_eq!(flattened_object, expected);
849        }
850    }
851
852    #[test]
853    fn test_greptime_pipeline_params() {
854        let params = Some("flatten_json_object=true");
855        let pipeline_params = GreptimePipelineParams::from_params(params);
856        assert!(pipeline_params.flatten_json_object());
857
858        let params = None;
859        let pipeline_params = GreptimePipelineParams::from_params(params);
860        assert!(!pipeline_params.flatten_json_object());
861    }
862}