1pub mod coerce;
16
17use std::borrow::Cow;
18use std::collections::{BTreeMap, HashSet};
19use std::sync::Arc;
20
21use ahash::{HashMap, HashMapExt};
22use api::helper::proto_value_type;
23use api::v1::column_data_type_extension::TypeExt;
24use api::v1::value::ValueData;
25use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
26use coerce::{coerce_columns, coerce_value};
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_telemetry::warn;
29use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
30use itertools::Itertools;
31use jsonb::Number;
32use once_cell::sync::OnceCell;
33use serde_json as serde_json_crate;
34use session::context::Channel;
35use snafu::OptionExt;
36use vrl::prelude::{Bytes, VrlValueConvert};
37use vrl::value::{KeyString, Value as VrlValue};
38
39use crate::error::{
40 IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
41 TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
42 TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
43};
44use crate::etl::PipelineDocVersion;
45use crate::etl::ctx_req::ContextOpt;
46use crate::etl::field::{Field, Fields};
47use crate::etl::transform::index::Index;
48use crate::etl::transform::{Transform, Transforms};
49use crate::{PipelineContext, truthy, unwrap_or_continue_if_err};
50
51const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
52
53#[derive(Debug, Clone)]
56pub struct GreptimeTransformer {
57 transforms: Transforms,
58 schema: Vec<ColumnSchema>,
59}
60
61#[derive(Debug, Default)]
63pub struct GreptimePipelineParams {
64 options: HashMap<String, String>,
67
68 pub skip_error: OnceCell<bool>,
70 pub max_nested_levels: OnceCell<usize>,
73}
74
75impl GreptimePipelineParams {
76 pub fn from_params(params: Option<&str>) -> Self {
80 let options = Self::parse_header_str_to_map(params);
81
82 Self {
83 options,
84 skip_error: OnceCell::new(),
85 max_nested_levels: OnceCell::new(),
86 }
87 }
88
89 pub fn from_map(options: HashMap<String, String>) -> Self {
90 Self {
91 options,
92 skip_error: OnceCell::new(),
93 max_nested_levels: OnceCell::new(),
94 }
95 }
96
97 pub fn parse_header_str_to_map(params: Option<&str>) -> HashMap<String, String> {
98 if let Some(params) = params {
99 if params.is_empty() {
100 HashMap::new()
101 } else {
102 params
103 .split('&')
104 .filter_map(|s| s.split_once('='))
105 .map(|(k, v)| (k.to_string(), v.to_string()))
106 .collect::<HashMap<String, String>>()
107 }
108 } else {
109 HashMap::new()
110 }
111 }
112
113 pub fn skip_error(&self) -> bool {
115 *self
116 .skip_error
117 .get_or_init(|| self.options.get("skip_error").map(truthy).unwrap_or(false))
118 }
119
120 pub fn max_nested_levels(&self) -> usize {
123 *self.max_nested_levels.get_or_init(|| {
124 self.options
125 .get("max_nested_levels")
126 .and_then(|s| s.parse::<usize>().ok())
127 .filter(|v| *v > 0)
128 .unwrap_or(DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)
129 })
130 }
131}
132
133impl GreptimeTransformer {
134 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
136 let type_ = ColumnDataType::TimestampNanosecond;
137 let default = None;
138
139 let transform = Transform {
140 fields: Fields::one(Field::new(greptime_timestamp().to_string(), None)),
141 type_,
142 default,
143 index: Some(Index::Time),
144 on_failure: Some(crate::etl::transform::OnFailure::Default),
145 tag: false,
146 };
147 transforms.push(transform);
148 }
149
150 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
152 let mut schema = vec![];
153 for transform in transforms.iter() {
154 schema.extend(coerce_columns(transform)?);
155 }
156 Ok(schema)
157 }
158}
159
160impl GreptimeTransformer {
161 pub fn new(mut transforms: Transforms, doc_version: &PipelineDocVersion) -> Result<Self> {
162 let mut column_names_set = HashSet::new();
164 let mut timestamp_columns = vec![];
165
166 for transform in transforms.iter() {
167 let target_fields_set = transform
168 .fields
169 .iter()
170 .map(|f| f.target_or_input_field())
171 .collect::<HashSet<_>>();
172
173 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
174 if !intersections.is_empty() {
175 let duplicates = intersections.iter().join(",");
176 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
177 }
178
179 column_names_set.extend(target_fields_set);
180
181 if let Some(idx) = transform.index
182 && idx == Index::Time
183 {
184 match transform.fields.len() {
185 1 => timestamp_columns.push(transform.fields.first().unwrap().input_field()),
187 _ => {
188 return TransformMultipleTimestampIndexSnafu {
189 columns: transform.fields.iter().map(|x| x.input_field()).join(", "),
190 }
191 .fail();
192 }
193 }
194 }
195 }
196
197 let schema = match timestamp_columns.len() {
198 0 if doc_version == &PipelineDocVersion::V1 => {
199 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
201 GreptimeTransformer::init_schemas(&transforms)?
202 }
203 1 => GreptimeTransformer::init_schemas(&transforms)?,
204 count => {
205 let columns = timestamp_columns.iter().join(", ");
206 return TransformTimestampIndexCountSnafu { count, columns }.fail();
207 }
208 };
209 Ok(GreptimeTransformer { transforms, schema })
210 }
211
212 pub fn transform_mut(
213 &self,
214 pipeline_map: &mut VrlValue,
215 is_v1: bool,
216 ) -> Result<Vec<GreptimeValue>> {
217 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
218 let mut output_index = 0;
219 for transform in self.transforms.iter() {
220 for field in transform.fields.iter() {
221 let column_name = field.input_field();
222
223 let pipeline_map = pipeline_map.as_object_mut().context(ValueMustBeMapSnafu)?;
224 match pipeline_map.get(column_name) {
226 Some(v) => {
227 let value_data = coerce_value(v, transform)?;
228 values[output_index] = GreptimeValue { value_data };
230 }
231 None => {
232 let value_data = match transform.on_failure {
233 Some(crate::etl::transform::OnFailure::Default) => {
234 match transform.get_default() {
235 Some(default) => Some(default.clone()),
236 None => transform.get_default_value_when_data_is_none(),
237 }
238 }
239 Some(crate::etl::transform::OnFailure::Ignore) => None,
240 None => None,
241 };
242 if transform.is_timeindex() && value_data.is_none() {
243 return TimeIndexMustBeNonNullSnafu.fail();
244 }
245 values[output_index] = GreptimeValue { value_data };
246 }
247 }
248 output_index += 1;
249 if !is_v1 {
250 pipeline_map.remove(column_name);
253 }
254 }
255 }
256 Ok(values)
257 }
258
259 pub fn transforms(&self) -> &Transforms {
260 &self.transforms
261 }
262
263 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
264 &self.schema
265 }
266
267 pub fn transforms_mut(&mut self) -> &mut Transforms {
268 &mut self.transforms
269 }
270}
271
272#[derive(Debug, Default)]
276pub struct SchemaInfo {
277 pub schema: Vec<ColumnSchema>,
279 pub index: HashMap<String, usize>,
281}
282
283impl SchemaInfo {
284 pub fn with_capacity(capacity: usize) -> Self {
285 Self {
286 schema: Vec::with_capacity(capacity),
287 index: HashMap::with_capacity(capacity),
288 }
289 }
290
291 pub fn from_schema_list(schema_list: Vec<ColumnSchema>) -> Self {
292 let mut index = HashMap::new();
293 for (i, schema) in schema_list.iter().enumerate() {
294 index.insert(schema.column_name.clone(), i);
295 }
296 Self {
297 schema: schema_list,
298 index,
299 }
300 }
301}
302
303fn resolve_schema(
304 index: Option<usize>,
305 value_data: ValueData,
306 column_schema: ColumnSchema,
307 row: &mut Vec<GreptimeValue>,
308 schema_info: &mut SchemaInfo,
309) -> Result<()> {
310 if let Some(index) = index {
311 let api_value = GreptimeValue {
312 value_data: Some(value_data),
313 };
314 let value_column_data_type = proto_value_type(&api_value).unwrap();
316 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
318 if value_column_data_type != schema_column_data_type {
319 IdentifyPipelineColumnTypeMismatchSnafu {
320 column: column_schema.column_name,
321 expected: schema_column_data_type.as_str_name(),
322 actual: value_column_data_type.as_str_name(),
323 }
324 .fail()
325 } else {
326 row[index] = api_value;
327 Ok(())
328 }
329 } else {
330 let key = column_schema.column_name.clone();
331 schema_info.schema.push(column_schema);
332 schema_info.index.insert(key, schema_info.schema.len() - 1);
333 let api_value = GreptimeValue {
334 value_data: Some(value_data),
335 };
336 row.push(api_value);
337 Ok(())
338 }
339}
340
341fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueData>> {
342 match p_ctx.channel {
343 Channel::Prometheus => {
344 let ts = values
345 .as_object()
346 .and_then(|m| m.get(greptime_timestamp()))
347 .and_then(|ts| ts.try_into_i64().ok())
348 .unwrap_or_default();
349 Ok(Some(ValueData::TimestampMillisecondValue(ts)))
350 }
351 _ => {
352 let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
353 match custom_ts {
354 Some(ts) => {
355 let ts_field = values.as_object().and_then(|m| m.get(ts.get_column_name()));
356 Some(ts.get_timestamp_value(ts_field)).transpose()
357 }
358 None => Ok(Some(ValueData::TimestampNanosecondValue(
359 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
360 ))),
361 }
362 }
363 }
364}
365
366pub(crate) fn values_to_row(
373 schema_info: &mut SchemaInfo,
374 values: VrlValue,
375 pipeline_ctx: &PipelineContext<'_>,
376 row: Option<Vec<GreptimeValue>>,
377 need_calc_ts: bool,
378) -> Result<Row> {
379 let mut row: Vec<GreptimeValue> =
380 row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
381 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
382
383 if need_calc_ts {
384 let ts = calc_ts(pipeline_ctx, &values)?;
386 row.push(GreptimeValue { value_data: ts });
387 }
388
389 row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
390
391 let ts_column_name = custom_ts
393 .as_ref()
394 .map_or(greptime_timestamp(), |ts| ts.get_column_name());
395
396 let values = values.into_object().context(ValueMustBeMapSnafu)?;
397
398 for (column_name, value) in values {
399 if need_calc_ts && column_name.as_str() == ts_column_name {
400 continue;
401 }
402
403 resolve_value(
404 value,
405 column_name.into(),
406 &mut row,
407 schema_info,
408 pipeline_ctx,
409 )?;
410 }
411 Ok(Row { values: row })
412}
413
414fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
415 if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
416 SemanticType::Tag as i32
417 } else {
418 SemanticType::Field as i32
419 }
420}
421
422fn resolve_value(
423 value: VrlValue,
424 column_name: String,
425 row: &mut Vec<GreptimeValue>,
426 schema_info: &mut SchemaInfo,
427 p_ctx: &PipelineContext,
428) -> Result<()> {
429 let index = schema_info.index.get(&column_name).copied();
430 let mut resolve_simple_type =
431 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
432 let semantic_type = decide_semantic(p_ctx, &column_name);
433 resolve_schema(
434 index,
435 value_data,
436 ColumnSchema {
437 column_name,
438 datatype: data_type as i32,
439 semantic_type,
440 datatype_extension: None,
441 options: None,
442 },
443 row,
444 schema_info,
445 )
446 };
447
448 match value {
449 VrlValue::Null => {}
450
451 VrlValue::Integer(v) => {
452 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
454 }
455
456 VrlValue::Float(v) => {
457 resolve_simple_type(
459 ValueData::F64Value(v.into()),
460 column_name,
461 ColumnDataType::Float64,
462 )?;
463 }
464
465 VrlValue::Boolean(v) => {
466 resolve_simple_type(
467 ValueData::BoolValue(v),
468 column_name,
469 ColumnDataType::Boolean,
470 )?;
471 }
472
473 VrlValue::Bytes(v) => {
474 resolve_simple_type(
475 ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
476 column_name,
477 ColumnDataType::String,
478 )?;
479 }
480
481 VrlValue::Regex(v) => {
482 warn!(
483 "Persisting regex value in the table, this should not happen, column_name: {}",
484 column_name
485 );
486 resolve_simple_type(
487 ValueData::StringValue(v.to_string()),
488 column_name,
489 ColumnDataType::String,
490 )?;
491 }
492
493 VrlValue::Timestamp(ts) => {
494 let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
495 input: ts.to_rfc3339(),
496 })?;
497 resolve_simple_type(
498 ValueData::TimestampNanosecondValue(ns),
499 column_name,
500 ColumnDataType::TimestampNanosecond,
501 )?;
502 }
503
504 VrlValue::Array(_) | VrlValue::Object(_) => {
505 let data = vrl_value_to_jsonb_value(&value);
506 resolve_schema(
507 index,
508 ValueData::BinaryValue(data.to_vec()),
509 ColumnSchema {
510 column_name,
511 datatype: ColumnDataType::Binary as i32,
512 semantic_type: SemanticType::Field as i32,
513 datatype_extension: Some(ColumnDataTypeExtension {
514 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
515 }),
516 options: None,
517 },
518 row,
519 schema_info,
520 )?;
521 }
522 }
523 Ok(())
524}
525
526fn vrl_value_to_jsonb_value<'a>(value: &'a VrlValue) -> jsonb::Value<'a> {
527 match value {
528 VrlValue::Bytes(bytes) => jsonb::Value::String(String::from_utf8_lossy(bytes)),
529 VrlValue::Regex(value_regex) => jsonb::Value::String(Cow::Borrowed(value_regex.as_str())),
530 VrlValue::Integer(i) => jsonb::Value::Number(Number::Int64(*i)),
531 VrlValue::Float(not_nan) => jsonb::Value::Number(Number::Float64(not_nan.into_inner())),
532 VrlValue::Boolean(b) => jsonb::Value::Bool(*b),
533 VrlValue::Timestamp(date_time) => jsonb::Value::String(Cow::Owned(date_time.to_rfc3339())),
534 VrlValue::Object(btree_map) => jsonb::Value::Object(
535 btree_map
536 .iter()
537 .map(|(key, value)| (key.to_string(), vrl_value_to_jsonb_value(value)))
538 .collect(),
539 ),
540 VrlValue::Array(values) => jsonb::Value::Array(
541 values
542 .iter()
543 .map(|value| vrl_value_to_jsonb_value(value))
544 .collect(),
545 ),
546 VrlValue::Null => jsonb::Value::Null,
547 }
548}
549
550fn identity_pipeline_inner(
551 pipeline_maps: Vec<VrlValue>,
552 pipeline_ctx: &PipelineContext<'_>,
553) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
554 let skip_error = pipeline_ctx.pipeline_param.skip_error();
555 let mut schema_info = SchemaInfo::default();
556 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
557
558 schema_info.schema.push(ColumnSchema {
560 column_name: custom_ts
561 .map(|ts| ts.get_column_name().to_string())
562 .unwrap_or_else(|| greptime_timestamp().to_string()),
563 datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
564 if pipeline_ctx.channel == Channel::Prometheus {
565 ColumnDataType::TimestampMillisecond
566 } else {
567 ColumnDataType::TimestampNanosecond
568 }
569 }) as i32,
570 semantic_type: SemanticType::Timestamp as i32,
571 datatype_extension: None,
572 options: None,
573 });
574
575 let mut opt_map = HashMap::new();
576 let len = pipeline_maps.len();
577
578 for mut pipeline_map in pipeline_maps {
579 let opt = unwrap_or_continue_if_err!(
580 ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map),
581 skip_error
582 );
583 let row = unwrap_or_continue_if_err!(
584 values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
585 skip_error
586 );
587
588 opt_map
589 .entry(opt)
590 .or_insert_with(|| Vec::with_capacity(len))
591 .push(row);
592 }
593
594 let column_count = schema_info.schema.len();
595 for (_, row) in opt_map.iter_mut() {
596 for row in row.iter_mut() {
597 let diff = column_count - row.values.len();
598 for _ in 0..diff {
599 row.values.push(GreptimeValue { value_data: None });
600 }
601 }
602 }
603
604 Ok((schema_info, opt_map))
605}
606
607pub fn identity_pipeline(
616 array: Vec<VrlValue>,
617 table: Option<Arc<table::Table>>,
618 pipeline_ctx: &PipelineContext<'_>,
619) -> Result<HashMap<ContextOpt, Rows>> {
620 let skip_error = pipeline_ctx.pipeline_param.skip_error();
621 let max_nested_levels = pipeline_ctx.pipeline_param.max_nested_levels();
622 let mut input = Vec::with_capacity(array.len());
624 for item in array.into_iter() {
625 let result =
626 unwrap_or_continue_if_err!(flatten_object(item, max_nested_levels), skip_error);
627 input.push(result);
628 }
629
630 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
631 if let Some(table) = table {
632 let table_info = table.table_info();
633 for tag_name in table_info.meta.row_key_column_names() {
634 if let Some(index) = schema.index.get(tag_name) {
635 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
636 }
637 }
638 }
639
640 opt_map
641 .into_iter()
642 .map(|(opt, rows)| {
643 (
644 opt,
645 Rows {
646 schema: schema.schema.clone(),
647 rows,
648 },
649 )
650 })
651 .collect::<HashMap<ContextOpt, Rows>>()
652 })
653}
654
655pub fn flatten_object(object: VrlValue, max_nested_levels: usize) -> Result<VrlValue> {
661 let mut flattened = BTreeMap::new();
662 let object = object.into_object().context(ValueMustBeMapSnafu)?;
663
664 if !object.is_empty() {
665 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels);
667 }
668
669 Ok(VrlValue::Object(flattened))
670}
671
672fn vrl_value_to_serde_json(value: &VrlValue) -> serde_json_crate::Value {
673 match value {
674 VrlValue::Null => serde_json_crate::Value::Null,
675 VrlValue::Boolean(b) => serde_json_crate::Value::Bool(*b),
676 VrlValue::Integer(i) => serde_json_crate::Value::Number((*i).into()),
677 VrlValue::Float(not_nan) => serde_json_crate::Number::from_f64(not_nan.into_inner())
678 .map(serde_json_crate::Value::Number)
679 .unwrap_or(serde_json_crate::Value::Null),
680 VrlValue::Bytes(bytes) => {
681 serde_json_crate::Value::String(String::from_utf8_lossy(bytes).into_owned())
682 }
683 VrlValue::Regex(re) => serde_json_crate::Value::String(re.as_str().to_string()),
684 VrlValue::Timestamp(ts) => serde_json_crate::Value::String(ts.to_rfc3339()),
685 VrlValue::Array(arr) => {
686 serde_json_crate::Value::Array(arr.iter().map(vrl_value_to_serde_json).collect())
687 }
688 VrlValue::Object(map) => serde_json_crate::Value::Object(
689 map.iter()
690 .map(|(k, v)| (k.to_string(), vrl_value_to_serde_json(v)))
691 .collect(),
692 ),
693 }
694}
695
696fn do_flatten_object(
697 dest: &mut BTreeMap<KeyString, VrlValue>,
698 base: Option<&str>,
699 object: BTreeMap<KeyString, VrlValue>,
700 current_level: usize,
701 max_nested_levels: usize,
702) {
703 for (key, value) in object {
704 let new_key = base.map_or_else(
705 || key.clone(),
706 |base_key| format!("{base_key}.{key}").into(),
707 );
708
709 match value {
710 VrlValue::Object(object) => {
711 if current_level >= max_nested_levels {
712 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(
714 &VrlValue::Object(object),
715 ))
716 .unwrap_or_else(|_| String::from("{}"));
717 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
718 } else {
719 do_flatten_object(
720 dest,
721 Some(&new_key),
722 object,
723 current_level + 1,
724 max_nested_levels,
725 );
726 }
727 }
728 VrlValue::Array(_) => {
730 let json_string = serde_json_crate::to_string(&vrl_value_to_serde_json(&value))
731 .unwrap_or_else(|_| String::from("[]"));
732 dest.insert(new_key, VrlValue::Bytes(Bytes::from(json_string)));
733 }
734 _ => {
736 dest.insert(new_key, value);
737 }
738 }
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use api::v1::SemanticType;
745
746 use super::*;
747 use crate::{PipelineDefinition, identity_pipeline};
748
749 #[test]
750 fn test_identify_pipeline() {
751 let params = GreptimePipelineParams::default();
752 let pipeline_ctx = PipelineContext::new(
753 &PipelineDefinition::GreptimeIdentityPipeline(None),
754 ¶ms,
755 Channel::Unknown,
756 );
757 {
758 let array = [
759 serde_json::json!({
760 "woshinull": null,
761 "name": "Alice",
762 "age": 20,
763 "is_student": true,
764 "score": 99.5,
765 "hobbies": "reading",
766 "address": "Beijing",
767 }),
768 serde_json::json!({
769 "name": "Bob",
770 "age": 21,
771 "is_student": false,
772 "score": "88.5",
773 "hobbies": "swimming",
774 "address": "Shanghai",
775 "gaga": "gaga"
776 }),
777 ];
778 let array = array.iter().map(|v| v.into()).collect();
779 let rows = identity_pipeline(array, None, &pipeline_ctx);
780 assert!(rows.is_err());
781 assert_eq!(
782 rows.err().unwrap().to_string(),
783 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
784 );
785 }
786 {
787 let array = [
788 serde_json::json!({
789 "woshinull": null,
790 "name": "Alice",
791 "age": 20,
792 "is_student": true,
793 "score": 99.5,
794 "hobbies": "reading",
795 "address": "Beijing",
796 }),
797 serde_json::json!({
798 "name": "Bob",
799 "age": 21,
800 "is_student": false,
801 "score": 88,
802 "hobbies": "swimming",
803 "address": "Shanghai",
804 "gaga": "gaga"
805 }),
806 ];
807 let array = array.iter().map(|v| v.into()).collect();
808 let rows = identity_pipeline(array, None, &pipeline_ctx);
809 assert!(rows.is_err());
810 assert_eq!(
811 rows.err().unwrap().to_string(),
812 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
813 );
814 }
815 {
816 let array = [
817 serde_json::json!({
818 "woshinull": null,
819 "name": "Alice",
820 "age": 20,
821 "is_student": true,
822 "score": 99.5,
823 "hobbies": "reading",
824 "address": "Beijing",
825 }),
826 serde_json::json!({
827 "name": "Bob",
828 "age": 21,
829 "is_student": false,
830 "score": 88.5,
831 "hobbies": "swimming",
832 "address": "Shanghai",
833 "gaga": "gaga"
834 }),
835 ];
836 let array = array.iter().map(|v| v.into()).collect();
837 let rows = identity_pipeline(array, None, &pipeline_ctx);
838 assert!(rows.is_ok());
839 let mut rows = rows.unwrap();
840 assert!(rows.len() == 1);
841 let rows = rows.remove(&ContextOpt::default()).unwrap();
842 assert_eq!(rows.schema.len(), 8);
843 assert_eq!(rows.rows.len(), 2);
844 assert_eq!(8, rows.rows[0].values.len());
845 assert_eq!(8, rows.rows[1].values.len());
846 }
847 {
848 let array = [
849 serde_json::json!({
850 "woshinull": null,
851 "name": "Alice",
852 "age": 20,
853 "is_student": true,
854 "score": 99.5,
855 "hobbies": "reading",
856 "address": "Beijing",
857 }),
858 serde_json::json!({
859 "name": "Bob",
860 "age": 21,
861 "is_student": false,
862 "score": 88.5,
863 "hobbies": "swimming",
864 "address": "Shanghai",
865 "gaga": "gaga"
866 }),
867 ];
868 let tag_column_names = ["name".to_string(), "address".to_string()];
869
870 let rows =
871 identity_pipeline_inner(array.iter().map(|v| v.into()).collect(), &pipeline_ctx)
872 .map(|(mut schema, mut rows)| {
873 for name in tag_column_names {
874 if let Some(index) = schema.index.get(&name) {
875 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
876 }
877 }
878
879 assert!(rows.len() == 1);
880 let rows = rows.remove(&ContextOpt::default()).unwrap();
881
882 Rows {
883 schema: schema.schema,
884 rows,
885 }
886 });
887
888 assert!(rows.is_ok());
889 let rows = rows.unwrap();
890 assert_eq!(rows.schema.len(), 8);
891 assert_eq!(rows.rows.len(), 2);
892 assert_eq!(8, rows.rows[0].values.len());
893 assert_eq!(8, rows.rows[1].values.len());
894 assert_eq!(
895 rows.schema
896 .iter()
897 .find(|x| x.column_name == "name")
898 .unwrap()
899 .semantic_type,
900 SemanticType::Tag as i32
901 );
902 assert_eq!(
903 rows.schema
904 .iter()
905 .find(|x| x.column_name == "address")
906 .unwrap()
907 .semantic_type,
908 SemanticType::Tag as i32
909 );
910 assert_eq!(
911 rows.schema
912 .iter()
913 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
914 .count(),
915 2
916 );
917 }
918 }
919
920 #[test]
921 fn test_flatten() {
922 let test_cases = vec![
923 (
925 serde_json::json!(
926 {
927 "a": {
928 "b": {
929 "c": [1, 2, 3]
930 }
931 },
932 "d": [
933 "foo",
934 "bar"
935 ],
936 "e": {
937 "f": [7, 8, 9],
938 "g": {
939 "h": 123,
940 "i": "hello",
941 "j": {
942 "k": true
943 }
944 }
945 }
946 }
947 ),
948 10,
949 Some(serde_json::json!(
950 {
951 "a.b.c": "[1,2,3]",
952 "d": "[\"foo\",\"bar\"]",
953 "e.f": "[7,8,9]",
954 "e.g.h": 123,
955 "e.g.i": "hello",
956 "e.g.j.k": true
957 }
958 )),
959 ),
960 (
962 serde_json::json!(
963 {
964 "a": {
965 "b": {
966 "c": {
967 "d": [1, 2, 3]
968 }
969 }
970 },
971 "e": [
972 "foo",
973 "bar"
974 ]
975 }
976 ),
977 3,
978 Some(serde_json::json!(
979 {
980 "a.b.c": "{\"d\":[1,2,3]}",
981 "e": "[\"foo\",\"bar\"]"
982 }
983 )),
984 ),
985 ];
986
987 for (input, max_depth, expected) in test_cases {
988 let input = input.into();
989 let expected = expected.map(|e| e.into());
990
991 let flattened_object = flatten_object(input, max_depth).ok();
992 assert_eq!(flattened_object, expected);
993 }
994 }
995}