1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
41 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
52const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
53
54#[derive(Debug, Clone)]
57pub struct GreptimeTransformer {
58 transforms: Transforms,
59 schema: Vec<ColumnSchema>,
60}
61
62#[derive(Debug, Default)]
64pub struct GreptimePipelineParams {
65 options: HashMap<String, String>,
68
69 pub skip_error: OnceCell<bool>,
71 pub max_nested_levels: OnceCell<usize>,
74}
75
76impl GreptimePipelineParams {
77 pub fn from_params(params: Option<&str>) -> Self {
81 let options = Self::parse_header_str_to_map(params);
82
83 Self {
84 options,
85 skip_error: OnceCell::new(),
86 max_nested_levels: OnceCell::new(),
87 }
88 }
89
90 pub fn from_map(options: HashMap<String, String>) -> Self {
91 Self {
92 options,
93 skip_error: OnceCell::new(),
94 max_nested_levels: OnceCell::new(),
95 }
96 }
97
98 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
99 if let Some(params) = params {
100 if params.is_empty() {
101 HashMap::new()
102 } else {
103 params
104 .split('&')
105 .filter_map(|s| s.split_once('='))
106 .map(|(k, v)| (k.to_string(), v.to_string()))
107 .collect::<HashMap<String, String>>()
108 }
109 } else {
110 HashMap::new()
111 }
112 }
113
114 pub fn skip_error(&self) -> bool {
116 *self
117 .skip_error
118 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
119 }
120
121 pub fn max_nested_levels(&self) -> usize {
124 *self.max_nested_levels.get_or_init(|| {
125 self.options
126 .get("max_nested_levels")
127 .and_then(|s| s.parse::<usize>().ok())
128 .filter(|v| *v > 0)
129 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
130 })
131 }
132}
133
134impl GreptimeTransformer {
135 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
137 let type_ = ColumnDataType::TimestampNanosecond;
138 let default = None;
139
140 let transform = Transform {
141 fields: Fields::one(Field::new(
142 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
143 None,
144 )),
145 type_,
146 default,
147 index: Some(Index::Time),
148 on_failure: Some(crate::etl::transform::OnFailure::Default),
149 tag: false,
150 };
151 transforms.push(transform);
152 }
153
154 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
156 let mut schema = vec![];
157 for transform in transforms.iter() {
158 schema.extend(coerce_columns(transform)?);
159 }
160 Ok(schema)
161 }
162}
163
164impl GreptimeTransformer {
165 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
166 let mut column_names_set = HashSet::new();
168 let mut timestamp_columns = vec![];
169
170 for transform in transforms.iter() {
171 let target_fields_set = transform
172 .fields
173 .iter()
174 .map(|f| f.target_or_input_field())
175 .collect::<HashSet<_>>();
176
177 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
178 if !intersections.is_empty() {
179 let duplicates = intersections.iter().join(",");
180 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
181 }
182
183 column_names_set.extend(target_fields_set);
184
185 if let Some(idx) = transform.index
186 && idx == Index::Time
187 {
188 match transform.fields.len() {
189 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
191 _ => {
192 return TransformMultipleTimestampIndexSnafu {
193 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
194 }
195 .fail();
196 }
197 }
198 }
199 }
200
201 let schema = match timestamp_columns.len() {
202 0 if doc_version == &PipelineDocVersion::V1 => {
203 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
205 GreptimeTransformer::init_schemas(&transforms)?
206 }
207 1 => GreptimeTransformer::init_schemas(&transforms)?,
208 count => {
209 let columns = timestamp_columns.iter().join(", ");
210 return TransformTimestampIndexCountSnafu { count, columns }.fail();
211 }
212 };
213 Ok(GreptimeTransformer { transforms, schema })
214 }
215
216 pub fn transform_mut(
217 &self,
218 pipeline_map: &mut VrlValue,
219 is_v1: bool,
220 ) -> Result<Vec<GreptimeValue>> {
221 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
222 let mut output_index = 0;
223 for transform in self.transforms.iter() {
224 for field in transform.fields.iter() {
225 let column_name = field.input_field();
226
227 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
228 match pipeline_map.get(column_name) {
230 Some(v) => {
231 let value_data = coerce_value(v, transform)?;
232 values[output_index] = GreptimeValue { value_data };
234 }
235 None => {
236 let value_data = match transform.on_failure {
237 Some(crate::etl::transform::OnFailure::Default) => {
238 match transform.get_default() {
239 Some(default) => Some(default.clone()),
240 None => transform.get_default_value_when_data_is_none(),
241 }
242 }
243 Some(crate::etl::transform::OnFailure::Ignore) => None,
244 None => None,
245 };
246 if transform.is_timeindex() && value_data.is_none() {
247 return TimeIndexMustBeNonNullSnafu.fail();
248 }
249 values[output_index] = GreptimeValue { value_data };
250 }
251 }
252 output_index += 1;
253 if !is_v1 {
254 pipeline_map.remove(column_name);
257 }
258 }
259 }
260 Ok(values)
261 }
262
263 pub fn transforms(&self) -> &Transforms {
264 &self.transforms
265 }
266
267 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
268 &self.schema
269 }
270
271 pub fn transforms_mut(&mut self) -> &mut Transforms {
272 &mut self.transforms
273 }
274}
275
276#[derive(Debug, Default)]
280pub struct SchemaInfo {
281 pub schema: Vec<ColumnSchema>,
283 pub index: HashMap<String, usize>,
285}
286
287impl SchemaInfo {
288 pub fn with_capacity(capacity: usize) -> Self {
289 Self {
290 schema: Vec::with_capacity(capacity),
291 index: HashMap::with_capacity(capacity),
292 }
293 }
294
295 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
296 let mut index = HashMap::new();
297 for (i, schema) in schema_list.iter().enumerate() {
298 index.insert(schema.column_name.clone(), i);
299 }
300 Self {
301 schema: schema_list,
302 index,
303 }
304 }
305}
306
307fn resolve_schema(
308 index: Option<usize>,
309 value_data: ValueData,
310 column_schema: ColumnSchema,
311 row: &mut Vec<GreptimeValue>,
312 schema_info: &mut SchemaInfo,
313) -> Result<()> {
314 if let Some(index) = index {
315 let api_value = GreptimeValue {
316 value_data: Some(value_data),
317 };
318 let value_column_data_type = proto_value_type(&api_value).unwrap();
320 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
322 if value_column_data_type != schema_column_data_type {
323 IdentifyPipelineColumnTypeMismatchSnafu {
324 column: column_schema.column_name,
325 expected: schema_column_data_type.as_str_name(),
326 actual: value_column_data_type.as_str_name(),
327 }
328 .fail()
329 } else {
330 row[index] = api_value;
331 Ok(())
332 }
333 } else {
334 let key = column_schema.column_name.clone();
335 schema_info.schema.push(column_schema);
336 schema_info.index.insert(key, schema_info.schema.len() - 1);
337 let api_value = GreptimeValue {
338 value_data: Some(value_data),
339 };
340 row.push(api_value);
341 Ok(())
342 }
343}
344
345fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
346 match p_ctx.channel {
347 Channel::Prometheus => {
348 let ts = values
349 .as_object()
350 .and_then(|m| m.get(GREPTIME_TIMESTAMP))
351 .and_then(|ts| ts.try_into_i64().ok())
352 .unwrap_or_default();
353 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
354 }
355 _ => {
356 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
357 match custom_ts {
358 Some(ts) => {
359 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
360 Some(ts.get_timestamp_value(ts_field)).transpose()
361 }
362 None => Ok(Some(ValueData::TimestampNanosecondValue(
363 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
364 ))),
365 }
366 }
367 }
368}
369
370pub(crate) fn values_to_row(
377 schema_info: &mut SchemaInfo,
378 values: VrlValue,
379 pipeline_ctx: &PipelineContext<'_>,
380 row: Option<Vec<GreptimeValue>>,
381 need_calc_ts: bool,
382) -> Result<Row> {
383 let mut row: Vec<GreptimeValue> =
384 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
385 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
386
387 if need_calc_ts {
388 let ts = calc_ts(pipeline_ctx, &values)?;
390 row.push(GreptimeValue { value_data: ts });
391 }
392
393 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
394
395 let ts_column_name = custom_ts
397 .as_ref()
398 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
399
400 let values = values.into_object().context(ValueMustBeMapSnafu)?;
401
402 for (column_name, value) in values {
403 if need_calc_ts && column_name.as_str() == ts_column_name {
404 continue;
405 }
406
407 resolve_value(
408 value,
409 column_name.into(),
410 &mut row,
411 schema_info,
412 pipeline_ctx,
413 )?;
414 }
415 Ok(Row { values: row })
416}
417
418fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
419 if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
420 SemanticType::Tag as i32
421 } else {
422 SemanticType::Field as i32
423 }
424}
425
426fn resolve_value(
427 value: VrlValue,
428 column_name: String,
429 row: &mut Vec<GreptimeValue>,
430 schema_info: &mut SchemaInfo,
431 p_ctx: &PipelineContext,
432) -> Result<()> {
433 let index = schema_info.index.get(&column_name).copied();
434 let mut resolve_simple_type =
435 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
436 let semantic_type = decide_semantic(p_ctx, &column_name);
437 resolve_schema(
438 index,
439 value_data,
440 ColumnSchema {
441 column_name,
442 datatype: data_type as i32,
443 semantic_type,
444 datatype_extension: None,
445 options: None,
446 },
447 row,
448 schema_info,
449 )
450 };
451
452 match value {
453 VrlValue::Null => {}
454
455 VrlValue::Integer(v) => {
456 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
458 }
459
460 VrlValue::Float(v) => {
461 resolve_simple_type(
463 ValueData::F64Value(v.into()),
464 column_name,
465 ColumnDataType::Float64,
466 )?;
467 }
468
469 VrlValue::Boolean(v) => {
470 resolve_simple_type(
471 ValueData::BoolValue(v),
472 column_name,
473 ColumnDataType::Boolean,
474 )?;
475 }
476
477 VrlValue::Bytes(v) => {
478 resolve_simple_type(
479 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
480 column_name,
481 ColumnDataType::String,
482 )?;
483 }
484
485 VrlValue::Regex(v) => {
486 warn!(
487 "Persisting regex value in the table, this should not happen, column_name: {}",
488 column_name
489 );
490 resolve_simple_type(
491 ValueData::StringValue(v.to_string()),
492 column_name,
493 ColumnDataType::String,
494 )?;
495 }
496
497 VrlValue::Timestamp(ts) => {
498 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
499 input: ts.to_rfc3339(),
500 })?;
501 resolve_simple_type(
502 ValueData::TimestampNanosecondValue(ns),
503 column_name,
504 ColumnDataType::TimestampNanosecond,
505 )?;
506 }
507
508 VrlValue::Array(_) | VrlValue::Object(_) => {
509 let data = vrl_value_to_jsonb_value(&value);
510 resolve_schema(
511 index,
512 ValueData::BinaryValue(data.to_vec()),
513 ColumnSchema {
514 column_name,
515 datatype: ColumnDataType::Binary as i32,
516 semantic_type: SemanticType::Field as i32,
517 datatype_extension: Some(ColumnDataTypeExtension {
518 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
519 }),
520 options: None,
521 },
522 row,
523 schema_info,
524 )?;
525 }
526 }
527 Ok(())
528}
529
530fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
531 match value {
532 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
533 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
534 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
535 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
536 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
537 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
538 VrlValue::Object(btree_map) => jsonb::Value::Object(
539 btree_map
540 .iter()
541 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
542 .collect(),
543 ),
544 VrlValue::Array(values) => jsonb::Value::Array(
545 values
546 .iter()
547 .map(|value| vrl_value_to_jsonb_value(value))
548 .collect(),
549 ),
550 VrlValue::Null => jsonb::Value::Null,
551 }
552}
553
554fn identity_pipeline_inner(
555 pipeline_maps: Vec<VrlValue>,
556 pipeline_ctx: &PipelineContext<'_>,
557) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
558 let skip_error = pipeline_ctx.pipeline_param.skip_error();
559 let mut schema_info = SchemaInfo::default();
560 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
561
562 schema_info.schema.push(ColumnSchema {
564 column_name: custom_ts
565 .map(|ts| ts.get_column_name().to_string())
566 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
567 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
568 if pipeline_ctx.channel == Channel::Prometheus {
569 ColumnDataType::TimestampMillisecond
570 } else {
571 ColumnDataType::TimestampNanosecond
572 }
573 }) as i32,
574 semantic_type: SemanticType::Timestamp as i32,
575 datatype_extension: None,
576 options: None,
577 });
578
579 let mut opt_map = HashMap::new();
580 let len = pipeline_maps.len();
581
582 for mut pipeline_map in pipeline_maps {
583 let opt = unwrap_or_continue_if_err!(
584 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
585 skip_error
586 );
587 let row = unwrap_or_continue_if_err!(
588 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
589 skip_error
590 );
591
592 opt_map
593 .entry(opt)
594 .or_insert_with(|| Vec::with_capacity(len))
595 .push(row);
596 }
597
598 let column_count = schema_info.schema.len();
599 for (_, row) in opt_map.iter_mut() {
600 for row in row.iter_mut() {
601 let diff = column_count - row.values.len();
602 for _ in 0..diff {
603 row.values.push(GreptimeValue { value_data: None });
604 }
605 }
606 }
607
608 Ok((schema_info, opt_map))
609}
610
611pub fn identity_pipeline(
620 array: Vec<VrlValue>,
621 table: Option<Arc<table::Table>>,
622 pipeline_ctx: &PipelineContext<'_>,
623) -> Result<HashMap<ContextOpt, Rows>> {
624 let skip_error = pipeline_ctx.pipeline_param.skip_error();
625 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
626 let mut input = Vec::with_capacity(array.len());
628 for item in array.into_iter() {
629 let result =
630 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
631 input.push(result);
632 }
633
634 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
635 if let Some(table) = table {
636 let table_info = table.table_info();
637 for tag_name in table_info.meta.row_key_column_names() {
638 if let Some(index) = schema.index.get(tag_name) {
639 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
640 }
641 }
642 }
643
644 opt_map
645 .into_iter()
646 .map(|(opt, rows)| {
647 (
648 opt,
649 Rows {
650 schema: schema.schema.clone(),
651 rows,
652 },
653 )
654 })
655 .collect::<HashMap<ContextOpt, Rows>>()
656 })
657}
658
659pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
665 let mut flattened = BTreeMap::new();
666 let object = object.into_object().context(ValueMustBeMapSnafu)?;
667
668 if !object.is_empty() {
669 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
671 }
672
673 Ok(VrlValue::Object(flattened))
674}
675
676fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
677 match value {
678 VrlValue::Null => serde_json_crate::Value::Null,
679 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
680 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
681 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
682 .map(serde_json_crate::Value::Number)
683 .unwrap_or(serde_json_crate::Value::Null),
684 VrlValue::Bytes(bytes) => {
685 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
686 }
687 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
688 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
689 VrlValue::Array(arr) => {
690 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
691 }
692 VrlValue::Object(map) => serde_json_crate::Value::Object(
693 map.iter()
694 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
695 .collect(),
696 ),
697 }
698}
699
700fn do_flatten_object(
701 dest: &mut BTreeMap<KeyString, VrlValue>,
702 base: Option<&str>,
703 object: BTreeMap<KeyString, VrlValue>,
704 current_level: usize,
705 max_nested_levels: usize,
706) {
707 for (key, value) in object {
708 let new_key = base.map_or_else(
709 || key.clone(),
710 |base_key| format!("{base_key}.{key}").into(),
711 );
712
713 match value {
714 VrlValue::Object(object) => {
715 if current_level >= max_nested_levels {
716 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
718 &VrlValue::Object(object),
719 ))
720 .unwrap_or_else(|_| String::from("{}"));
721 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
722 } else {
723 do_flatten_object(
724 dest,
725 Some(&new_key),
726 object,
727 current_level + 1,
728 max_nested_levels,
729 );
730 }
731 }
732 VrlValue::Array(_) => {
734 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
735 .unwrap_or_else(|_| String::from("[]"));
736 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
737 }
738 _ => {
740 dest.insert(new_key, value);
741 }
742 }
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use api::v1::SemanticType;
749
750 use super::*;
751 use crate::{PipelineDefinition, identity_pipeline};
752
753 #[test]
754 fn test_identify_pipeline() {
755 let params = GreptimePipelineParams::default();
756 let pipeline_ctx = PipelineContext::new(
757 &PipelineDefinition::GreptimeIdentityPipeline(None),
758 ¶ms,
759 Channel::Unknown,
760 );
761 {
762 let array = [
763 serde_json::json!({
764 "woshinull": null,
765 "name": "Alice",
766 "age": 20,
767 "is_student": true,
768 "score": 99.5,
769 "hobbies": "reading",
770 "address": "Beijing",
771 }),
772 serde_json::json!({
773 "name": "Bob",
774 "age": 21,
775 "is_student": false,
776 "score": "88.5",
777 "hobbies": "swimming",
778 "address": "Shanghai",
779 "gaga": "gaga"
780 }),
781 ];
782 let array = array.iter().map(|v| v.into()).collect();
783 let rows = identity_pipeline(array, None, &pipeline_ctx);
784 assert!(rows.is_err());
785 assert_eq!(
786 rows.err().unwrap().to_string(),
787 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
788 );
789 }
790 {
791 let array = [
792 serde_json::json!({
793 "woshinull": null,
794 "name": "Alice",
795 "age": 20,
796 "is_student": true,
797 "score": 99.5,
798 "hobbies": "reading",
799 "address": "Beijing",
800 }),
801 serde_json::json!({
802 "name": "Bob",
803 "age": 21,
804 "is_student": false,
805 "score": 88,
806 "hobbies": "swimming",
807 "address": "Shanghai",
808 "gaga": "gaga"
809 }),
810 ];
811 let array = array.iter().map(|v| v.into()).collect();
812 let rows = identity_pipeline(array, None, &pipeline_ctx);
813 assert!(rows.is_err());
814 assert_eq!(
815 rows.err().unwrap().to_string(),
816 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
817 );
818 }
819 {
820 let array = [
821 serde_json::json!({
822 "woshinull": null,
823 "name": "Alice",
824 "age": 20,
825 "is_student": true,
826 "score": 99.5,
827 "hobbies": "reading",
828 "address": "Beijing",
829 }),
830 serde_json::json!({
831 "name": "Bob",
832 "age": 21,
833 "is_student": false,
834 "score": 88.5,
835 "hobbies": "swimming",
836 "address": "Shanghai",
837 "gaga": "gaga"
838 }),
839 ];
840 let array = array.iter().map(|v| v.into()).collect();
841 let rows = identity_pipeline(array, None, &pipeline_ctx);
842 assert!(rows.is_ok());
843 let mut rows = rows.unwrap();
844 assert!(rows.len() == 1);
845 let rows = rows.remove(&ContextOpt::default()).unwrap();
846 assert_eq!(rows.schema.len(), 8);
847 assert_eq!(rows.rows.len(), 2);
848 assert_eq!(8, rows.rows[0].values.len());
849 assert_eq!(8, rows.rows[1].values.len());
850 }
851 {
852 let array = [
853 serde_json::json!({
854 "woshinull": null,
855 "name": "Alice",
856 "age": 20,
857 "is_student": true,
858 "score": 99.5,
859 "hobbies": "reading",
860 "address": "Beijing",
861 }),
862 serde_json::json!({
863 "name": "Bob",
864 "age": 21,
865 "is_student": false,
866 "score": 88.5,
867 "hobbies": "swimming",
868 "address": "Shanghai",
869 "gaga": "gaga"
870 }),
871 ];
872 let tag_column_names = ["name".to_string(), "address".to_string()];
873
874 let rows =
875 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
876 .map(|(mut schema, mut rows)| {
877 for name in tag_column_names {
878 if let Some(index) = schema.index.get(&name) {
879 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
880 }
881 }
882
883 assert!(rows.len() == 1);
884 let rows = rows.remove(&ContextOpt::default()).unwrap();
885
886 Rows {
887 schema: schema.schema,
888 rows,
889 }
890 });
891
892 assert!(rows.is_ok());
893 let rows = rows.unwrap();
894 assert_eq!(rows.schema.len(), 8);
895 assert_eq!(rows.rows.len(), 2);
896 assert_eq!(8, rows.rows[0].values.len());
897 assert_eq!(8, rows.rows[1].values.len());
898 assert_eq!(
899 rows.schema
900 .iter()
901 .find(|x| x.column_name == "name")
902 .unwrap()
903 .semantic_type,
904 SemanticType::Tag as i32
905 );
906 assert_eq!(
907 rows.schema
908 .iter()
909 .find(|x| x.column_name == "address")
910 .unwrap()
911 .semantic_type,
912 SemanticType::Tag as i32
913 );
914 assert_eq!(
915 rows.schema
916 .iter()
917 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
918 .count(),
919 2
920 );
921 }
922 }
923
924 #[test]
925 fn test_flatten() {
926 let test_cases = vec![
927 (
929 serde_json::json!(
930 {
931 "a": {
932 "b": {
933 "c": [1, 2, 3]
934 }
935 },
936 "d": [
937 "foo",
938 "bar"
939 ],
940 "e": {
941 "f": [7, 8, 9],
942 "g": {
943 "h": 123,
944 "i": "hello",
945 "j": {
946 "k": true
947 }
948 }
949 }
950 }
951 ),
952 10,
953 Some(serde_json::json!(
954 {
955 "a.b.c": "[1,2,3]",
956 "d": "[\"foo\",\"bar\"]",
957 "e.f": "[7,8,9]",
958 "e.g.h": 123,
959 "e.g.i": "hello",
960 "e.g.j.k": true
961 }
962 )),
963 ),
964 (
966 serde_json::json!(
967 {
968 "a": {
969 "b": {
970 "c": {
971 "d": [1, 2, 3]
972 }
973 }
974 },
975 "e": [
976 "foo",
977 "bar"
978 ]
979 }
980 ),
981 3,
982 Some(serde_json::json!(
983 {
984 "a.b.c": "{\"d\":[1,2,3]}",
985 "e": "[\"foo\",\"bar\"]"
986 }
987 )),
988 ),
989 ];
990
991 for (input, max_depth, expected) in test_cases {
992 let input = input.into();
993 let expected = expected.map(|e| e.into());
994
995 let flattened_object = flatten_object(input, max_depth).ok();
996 assert_eq!(flattened_object, expected);
997 }
998 }
999}