1pub mod coerce;
16
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use ahash::{HashMap, HashMapExt};
21use api::helper::proto_value_type;
22use api::v1::column_data_type_extension::TypeExt;
23use api::v1::value::ValueData;
24use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
25use coerce::{coerce_columns, coerce_value};
26use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
27use itertools::Itertools;
28use once_cell::sync::OnceCell;
29use serde_json::Number;
30
31use crate::error::{
32 IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
33 TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
34 TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
35};
36use crate::etl::field::{Field, Fields};
37use crate::etl::transform::index::Index;
38use crate::etl::transform::{Transform, Transforms};
39use crate::etl::value::{Timestamp, Value};
40use crate::etl::PipelineMap;
41use crate::{IdentityTimeIndex, PipelineContext};
42
43const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
44const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
45
46#[derive(Debug, Clone)]
49pub struct GreptimeTransformer {
50 transforms: Transforms,
51 schema: Vec<ColumnSchema>,
52}
53
54#[derive(Debug, Clone, Default)]
56pub struct GreptimePipelineParams {
57 options: HashMap<String, String>,
60
61 pub flatten_json_object: OnceCell<bool>,
63}
64
65impl GreptimePipelineParams {
66 pub fn from_params(params: Option<&str>) -> Self {
70 let options = params
71 .unwrap_or_default()
72 .split('&')
73 .filter_map(|s| s.split_once('='))
74 .map(|(k, v)| (k.to_string(), v.to_string()))
75 .collect::<HashMap<String, String>>();
76
77 Self {
78 options,
79 flatten_json_object: OnceCell::new(),
80 }
81 }
82
83 pub fn flatten_json_object(&self) -> bool {
85 *self.flatten_json_object.get_or_init(|| {
86 self.options
87 .get("flatten_json_object")
88 .map(|v| v == "true")
89 .unwrap_or(false)
90 })
91 }
92}
93
94impl GreptimeTransformer {
95 fn add_greptime_timestamp_column(transforms: &mut Transforms) {
97 let type_ = Value::Timestamp(Timestamp::Nanosecond(0));
98 let default = None;
99
100 let transform = Transform {
101 fields: Fields::one(Field::new(
102 DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
103 None,
104 )),
105 type_,
106 default,
107 index: Some(Index::Time),
108 on_failure: Some(crate::etl::transform::OnFailure::Default),
109 tag: false,
110 };
111 transforms.push(transform);
112 }
113
114 fn init_schemas(transforms: &Transforms) -> Result<Vec<ColumnSchema>> {
116 let mut schema = vec![];
117 for transform in transforms.iter() {
118 schema.extend(coerce_columns(transform)?);
119 }
120 Ok(schema)
121 }
122}
123
124impl GreptimeTransformer {
125 pub fn new(mut transforms: Transforms) -> Result<Self> {
126 let mut column_names_set = HashSet::new();
128 let mut timestamp_columns = vec![];
129
130 for transform in transforms.iter() {
131 let target_fields_set = transform
132 .fields
133 .iter()
134 .map(|f| f.target_or_input_field())
135 .collect::<HashSet<_>>();
136
137 let intersections: Vec<_> = column_names_set.intersection(&target_fields_set).collect();
138 if !intersections.is_empty() {
139 let duplicates = intersections.iter().join(",");
140 return TransformColumnNameMustBeUniqueSnafu { duplicates }.fail();
141 }
142
143 column_names_set.extend(target_fields_set);
144
145 if let Some(idx) = transform.index {
146 if idx == Index::Time {
147 match transform.fields.len() {
148 1 => {
150 timestamp_columns.push(transform.fields.first().unwrap().input_field())
151 }
152 _ => {
153 return TransformMultipleTimestampIndexSnafu {
154 columns: transform
155 .fields
156 .iter()
157 .map(|x| x.input_field())
158 .join(", "),
159 }
160 .fail();
161 }
162 }
163 }
164 }
165 }
166
167 match timestamp_columns.len() {
168 0 => {
169 GreptimeTransformer::add_greptime_timestamp_column(&mut transforms);
170
171 let schema = GreptimeTransformer::init_schemas(&transforms)?;
172 Ok(GreptimeTransformer { transforms, schema })
173 }
174 1 => {
175 let schema = GreptimeTransformer::init_schemas(&transforms)?;
176 Ok(GreptimeTransformer { transforms, schema })
177 }
178 _ => {
179 let columns: String = timestamp_columns.iter().map(|s| s.to_string()).join(", ");
180 let count = timestamp_columns.len();
181 TransformTimestampIndexCountSnafu { count, columns }.fail()
182 }
183 }
184 }
185
186 pub fn transform_mut(&self, val: &mut PipelineMap) -> Result<Row> {
187 let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
188 let mut output_index = 0;
189 for transform in self.transforms.iter() {
190 for field in transform.fields.iter() {
191 let index = field.input_field();
192 match val.get(index) {
193 Some(v) => {
194 let value_data = coerce_value(v, transform)?;
195 values[output_index] = GreptimeValue { value_data };
197 }
198 None => {
199 let value_data = match transform.on_failure {
200 Some(crate::etl::transform::OnFailure::Default) => {
201 match transform.get_default() {
202 Some(default) => coerce_value(default, transform)?,
203 None => match transform.get_default_value_when_data_is_none() {
204 Some(default) => coerce_value(&default, transform)?,
205 None => None,
206 },
207 }
208 }
209 Some(crate::etl::transform::OnFailure::Ignore) => None,
210 None => None,
211 };
212 values[output_index] = GreptimeValue { value_data };
213 }
214 }
215 output_index += 1;
216 }
217 }
218 Ok(Row { values })
219 }
220
221 pub fn transforms(&self) -> &Transforms {
222 &self.transforms
223 }
224
225 pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
226 &self.schema
227 }
228
229 pub fn transforms_mut(&mut self) -> &mut Transforms {
230 &mut self.transforms
231 }
232}
233
234#[derive(Debug, Default)]
238pub struct SchemaInfo {
239 pub schema: Vec<ColumnSchema>,
241 pub index: HashMap<String, usize>,
243}
244
245impl SchemaInfo {
246 pub fn with_capacity(capacity: usize) -> Self {
247 Self {
248 schema: Vec::with_capacity(capacity),
249 index: HashMap::with_capacity(capacity),
250 }
251 }
252}
253
254fn resolve_schema(
255 index: Option<usize>,
256 value_data: ValueData,
257 column_schema: ColumnSchema,
258 row: &mut Vec<GreptimeValue>,
259 schema_info: &mut SchemaInfo,
260) -> Result<()> {
261 if let Some(index) = index {
262 let api_value = GreptimeValue {
263 value_data: Some(value_data),
264 };
265 let value_column_data_type = proto_value_type(&api_value).unwrap();
267 let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
269 if value_column_data_type != schema_column_data_type {
270 IdentifyPipelineColumnTypeMismatchSnafu {
271 column: column_schema.column_name,
272 expected: schema_column_data_type.as_str_name(),
273 actual: value_column_data_type.as_str_name(),
274 }
275 .fail()
276 } else {
277 row[index] = api_value;
278 Ok(())
279 }
280 } else {
281 let key = column_schema.column_name.clone();
282 schema_info.schema.push(column_schema);
283 schema_info.index.insert(key, schema_info.schema.len() - 1);
284 let api_value = GreptimeValue {
285 value_data: Some(value_data),
286 };
287 row.push(api_value);
288 Ok(())
289 }
290}
291
292fn resolve_number_schema(
293 n: Number,
294 column_name: String,
295 index: Option<usize>,
296 row: &mut Vec<GreptimeValue>,
297 schema_info: &mut SchemaInfo,
298) -> Result<()> {
299 let (value, datatype, semantic_type) = if n.is_i64() {
300 (
301 ValueData::I64Value(n.as_i64().unwrap()),
302 ColumnDataType::Int64 as i32,
303 SemanticType::Field as i32,
304 )
305 } else if n.is_u64() {
306 (
307 ValueData::U64Value(n.as_u64().unwrap()),
308 ColumnDataType::Uint64 as i32,
309 SemanticType::Field as i32,
310 )
311 } else if n.is_f64() {
312 (
313 ValueData::F64Value(n.as_f64().unwrap()),
314 ColumnDataType::Float64 as i32,
315 SemanticType::Field as i32,
316 )
317 } else {
318 return UnsupportedNumberTypeSnafu { value: n }.fail();
319 };
320 resolve_schema(
321 index,
322 value,
323 ColumnSchema {
324 column_name,
325 datatype,
326 semantic_type,
327 datatype_extension: None,
328 options: None,
329 },
330 row,
331 schema_info,
332 )
333}
334
335fn values_to_row(
336 schema_info: &mut SchemaInfo,
337 values: PipelineMap,
338 custom_ts: Option<&IdentityTimeIndex>,
339) -> Result<Row> {
340 let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
341
342 let value_data = match custom_ts {
344 Some(ts) => {
345 let ts_field = values.get(ts.get_column_name());
346 Some(ts.get_timestamp(ts_field)?)
347 }
348 None => Some(ValueData::TimestampNanosecondValue(
349 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
350 )),
351 };
352
353 row.push(GreptimeValue { value_data });
354
355 for _ in 1..schema_info.schema.len() {
356 row.push(GreptimeValue { value_data: None });
357 }
358
359 for (column_name, value) in values {
360 let ts_column = custom_ts
362 .as_ref()
363 .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
364 if column_name == ts_column {
365 continue;
366 }
367
368 let index = schema_info.index.get(&column_name).copied();
369 resolve_value(index, value, column_name, &mut row, schema_info)?;
370 }
371 Ok(Row { values: row })
372}
373
374fn resolve_value(
375 index: Option<usize>,
376 value: Value,
377 column_name: String,
378 row: &mut Vec<GreptimeValue>,
379 schema_info: &mut SchemaInfo,
380) -> Result<()> {
381 let mut resolve_simple_type =
382 |value_data: ValueData, column_name: String, data_type: ColumnDataType| {
383 resolve_schema(
384 index,
385 value_data,
386 ColumnSchema {
387 column_name,
388 datatype: data_type as i32,
389 semantic_type: SemanticType::Field as i32,
390 datatype_extension: None,
391 options: None,
392 },
393 row,
394 schema_info,
395 )
396 };
397
398 match value {
399 Value::Null => {}
400
401 Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => {
402 let v = value.as_i64().unwrap();
404 resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
405 }
406
407 Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => {
408 let v = value.as_u64().unwrap();
410 resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?;
411 }
412
413 Value::Float32(_) | Value::Float64(_) => {
414 let v = value.as_f64().unwrap();
416 resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?;
417 }
418
419 Value::Boolean(v) => {
420 resolve_simple_type(
421 ValueData::BoolValue(v),
422 column_name,
423 ColumnDataType::Boolean,
424 )?;
425 }
426
427 Value::String(v) => {
428 resolve_simple_type(
429 ValueData::StringValue(v),
430 column_name,
431 ColumnDataType::String,
432 )?;
433 }
434
435 Value::Timestamp(Timestamp::Nanosecond(ns)) => {
436 resolve_simple_type(
437 ValueData::TimestampNanosecondValue(ns),
438 column_name,
439 ColumnDataType::TimestampNanosecond,
440 )?;
441 }
442
443 Value::Timestamp(Timestamp::Microsecond(us)) => {
444 resolve_simple_type(
445 ValueData::TimestampMicrosecondValue(us),
446 column_name,
447 ColumnDataType::TimestampMicrosecond,
448 )?;
449 }
450
451 Value::Timestamp(Timestamp::Millisecond(ms)) => {
452 resolve_simple_type(
453 ValueData::TimestampMillisecondValue(ms),
454 column_name,
455 ColumnDataType::TimestampMillisecond,
456 )?;
457 }
458
459 Value::Timestamp(Timestamp::Second(s)) => {
460 resolve_simple_type(
461 ValueData::TimestampSecondValue(s),
462 column_name,
463 ColumnDataType::TimestampSecond,
464 )?;
465 }
466
467 Value::Array(_) | Value::Map(_) => {
468 let data: jsonb::Value = value.into();
469 resolve_schema(
470 index,
471 ValueData::BinaryValue(data.to_vec()),
472 ColumnSchema {
473 column_name,
474 datatype: ColumnDataType::Binary as i32,
475 semantic_type: SemanticType::Field as i32,
476 datatype_extension: Some(ColumnDataTypeExtension {
477 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
478 }),
479 options: None,
480 },
481 row,
482 schema_info,
483 )?;
484 }
485 }
486 Ok(())
487}
488
489fn identity_pipeline_inner(
490 pipeline_maps: Vec<PipelineMap>,
491 pipeline_ctx: &PipelineContext<'_>,
492) -> Result<(SchemaInfo, Vec<Row>)> {
493 let mut rows = Vec::with_capacity(pipeline_maps.len());
494 let mut schema_info = SchemaInfo::default();
495 let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
496
497 schema_info.schema.push(ColumnSchema {
499 column_name: custom_ts
500 .map(|ts| ts.get_column_name().clone())
501 .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
502 datatype: custom_ts
503 .map(|c| c.get_datatype())
504 .unwrap_or(ColumnDataType::TimestampNanosecond) as i32,
505 semantic_type: SemanticType::Timestamp as i32,
506 datatype_extension: None,
507 options: None,
508 });
509
510 for values in pipeline_maps {
511 let row = values_to_row(&mut schema_info, values, custom_ts)?;
512 rows.push(row);
513 }
514
515 let column_count = schema_info.schema.len();
516 for row in rows.iter_mut() {
517 let diff = column_count - row.values.len();
518 for _ in 0..diff {
519 row.values.push(GreptimeValue { value_data: None });
520 }
521 }
522
523 Ok((schema_info, rows))
524}
525
526pub fn identity_pipeline(
535 array: Vec<PipelineMap>,
536 table: Option<Arc<table::Table>>,
537 pipeline_ctx: &PipelineContext<'_>,
538) -> Result<Rows> {
539 let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
540 array
541 .into_iter()
542 .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
543 .collect::<Result<Vec<PipelineMap>>>()?
544 } else {
545 array
546 };
547
548 identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| {
549 if let Some(table) = table {
550 let table_info = table.table_info();
551 for tag_name in table_info.meta.row_key_column_names() {
552 if let Some(index) = schema.index.get(tag_name) {
553 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
554 }
555 }
556 }
557 Rows {
558 schema: schema.schema,
559 rows,
560 }
561 })
562}
563
564pub fn flatten_object(object: PipelineMap, max_nested_levels: usize) -> Result<PipelineMap> {
569 let mut flattened = PipelineMap::new();
570
571 if !object.is_empty() {
572 do_flatten_object(&mut flattened, None, object, 1, max_nested_levels)?;
574 }
575
576 Ok(flattened)
577}
578
579fn do_flatten_object(
580 dest: &mut PipelineMap,
581 base: Option<&str>,
582 object: PipelineMap,
583 current_level: usize,
584 max_nested_levels: usize,
585) -> Result<()> {
586 if current_level > max_nested_levels {
588 return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
589 }
590
591 for (key, value) in object {
592 let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));
593
594 match value {
595 Value::Map(object) => {
596 do_flatten_object(
597 dest,
598 Some(&new_key),
599 object.values,
600 current_level + 1,
601 max_nested_levels,
602 )?;
603 }
604 _ => {
606 dest.insert(new_key, value);
607 }
608 }
609 }
610
611 Ok(())
612}
613
614#[cfg(test)]
615mod tests {
616 use api::v1::SemanticType;
617
618 use super::*;
619 use crate::etl::{json_array_to_map, json_to_map};
620 use crate::{identity_pipeline, PipelineDefinition};
621
622 #[test]
623 fn test_identify_pipeline() {
624 let params = GreptimePipelineParams::default();
625 let pipeline_ctx =
626 PipelineContext::new(&PipelineDefinition::GreptimeIdentityPipeline(None), ¶ms);
627 {
628 let array = vec![
629 serde_json::json!({
630 "woshinull": null,
631 "name": "Alice",
632 "age": 20,
633 "is_student": true,
634 "score": 99.5,
635 "hobbies": "reading",
636 "address": "Beijing",
637 }),
638 serde_json::json!({
639 "name": "Bob",
640 "age": 21,
641 "is_student": false,
642 "score": "88.5",
643 "hobbies": "swimming",
644 "address": "Shanghai",
645 "gaga": "gaga"
646 }),
647 ];
648 let array = json_array_to_map(array).unwrap();
649 let rows = identity_pipeline(array, None, &pipeline_ctx);
650 assert!(rows.is_err());
651 assert_eq!(
652 rows.err().unwrap().to_string(),
653 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
654 );
655 }
656 {
657 let array = vec![
658 serde_json::json!({
659 "woshinull": null,
660 "name": "Alice",
661 "age": 20,
662 "is_student": true,
663 "score": 99.5,
664 "hobbies": "reading",
665 "address": "Beijing",
666 }),
667 serde_json::json!({
668 "name": "Bob",
669 "age": 21,
670 "is_student": false,
671 "score": 88,
672 "hobbies": "swimming",
673 "address": "Shanghai",
674 "gaga": "gaga"
675 }),
676 ];
677 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
678 assert!(rows.is_err());
679 assert_eq!(
680 rows.err().unwrap().to_string(),
681 "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
682 );
683 }
684 {
685 let array = vec![
686 serde_json::json!({
687 "woshinull": null,
688 "name": "Alice",
689 "age": 20,
690 "is_student": true,
691 "score": 99.5,
692 "hobbies": "reading",
693 "address": "Beijing",
694 }),
695 serde_json::json!({
696 "name": "Bob",
697 "age": 21,
698 "is_student": false,
699 "score": 88.5,
700 "hobbies": "swimming",
701 "address": "Shanghai",
702 "gaga": "gaga"
703 }),
704 ];
705 let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
706 assert!(rows.is_ok());
707 let rows = rows.unwrap();
708 assert_eq!(rows.schema.len(), 8);
709 assert_eq!(rows.rows.len(), 2);
710 assert_eq!(8, rows.rows[0].values.len());
711 assert_eq!(8, rows.rows[1].values.len());
712 }
713 {
714 let array = vec![
715 serde_json::json!({
716 "woshinull": null,
717 "name": "Alice",
718 "age": 20,
719 "is_student": true,
720 "score": 99.5,
721 "hobbies": "reading",
722 "address": "Beijing",
723 }),
724 serde_json::json!({
725 "name": "Bob",
726 "age": 21,
727 "is_student": false,
728 "score": 88.5,
729 "hobbies": "swimming",
730 "address": "Shanghai",
731 "gaga": "gaga"
732 }),
733 ];
734 let tag_column_names = ["name".to_string(), "address".to_string()];
735
736 let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
737 .map(|(mut schema, rows)| {
738 for name in tag_column_names {
739 if let Some(index) = schema.index.get(&name) {
740 schema.schema[*index].semantic_type = SemanticType::Tag as i32;
741 }
742 }
743 Rows {
744 schema: schema.schema,
745 rows,
746 }
747 });
748
749 assert!(rows.is_ok());
750 let rows = rows.unwrap();
751 assert_eq!(rows.schema.len(), 8);
752 assert_eq!(rows.rows.len(), 2);
753 assert_eq!(8, rows.rows[0].values.len());
754 assert_eq!(8, rows.rows[1].values.len());
755 assert_eq!(
756 rows.schema
757 .iter()
758 .find(|x| x.column_name == "name")
759 .unwrap()
760 .semantic_type,
761 SemanticType::Tag as i32
762 );
763 assert_eq!(
764 rows.schema
765 .iter()
766 .find(|x| x.column_name == "address")
767 .unwrap()
768 .semantic_type,
769 SemanticType::Tag as i32
770 );
771 assert_eq!(
772 rows.schema
773 .iter()
774 .filter(|x| x.semantic_type == SemanticType::Tag as i32)
775 .count(),
776 2
777 );
778 }
779 }
780
781 #[test]
782 fn test_flatten() {
783 let test_cases = vec![
784 (
786 serde_json::json!(
787 {
788 "a": {
789 "b": {
790 "c": [1, 2, 3]
791 }
792 },
793 "d": [
794 "foo",
795 "bar"
796 ],
797 "e": {
798 "f": [7, 8, 9],
799 "g": {
800 "h": 123,
801 "i": "hello",
802 "j": {
803 "k": true
804 }
805 }
806 }
807 }
808 ),
809 10,
810 Some(serde_json::json!(
811 {
812 "a.b.c": [1,2,3],
813 "d": ["foo","bar"],
814 "e.f": [7,8,9],
815 "e.g.h": 123,
816 "e.g.i": "hello",
817 "e.g.j.k": true
818 }
819 )),
820 ),
821 (
823 serde_json::json!(
824 {
825 "a": {
826 "b": {
827 "c": {
828 "d": [1, 2, 3]
829 }
830 }
831 },
832 "e": [
833 "foo",
834 "bar"
835 ]
836 }
837 ),
838 3,
839 None,
840 ),
841 ];
842
843 for (input, max_depth, expected) in test_cases {
844 let input = json_to_map(input).unwrap();
845 let expected = expected.map(|e| json_to_map(e).unwrap());
846
847 let flattened_object = flatten_object(input, max_depth).ok();
848 assert_eq!(flattened_object, expected);
849 }
850 }
851
852 #[test]
853 fn test_greptime_pipeline_params() {
854 let params = Some("flatten_json_object=true");
855 let pipeline_params = GreptimePipelineParams::from_params(params);
856 assert!(pipeline_params.flatten_json_object());
857
858 let params = None;
859 let pipeline_params = GreptimePipelineParams::from_params(params);
860 assert!(!pipeline_params.flatten_json_object());
861 }
862}