1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::HashMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{OptionExt, ResultExt, ensure};
29use transform::Transforms;
30use vrl::core::Value as VrlValue;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35 ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error,
36 IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu,
37 YamlLoadSnafu, YamlParseSnafu,
38};
39use crate::etl::processor::ProcessorKind;
40use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows};
41use crate::tablesuffix::TableSuffixTemplate;
42use crate::{
43 ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo,
44 unwrap_or_continue_if_err,
45};
46
47const DESCRIPTION: &str = "description";
48const DOC_VERSION: &str = "version";
49const PROCESSORS: &str = "processors";
50const TRANSFORM: &str = "transform";
51const TRANSFORMS: &str = "transforms";
52const DISPATCHER: &str = "dispatcher";
53const TABLESUFFIX: &str = "table_suffix";
54
55pub enum Content<'a> {
56 Json(&'a str),
57 Yaml(&'a str),
58}
59
60pub fn parse(input: &Content) -> Result<Pipeline> {
61 match input {
62 Content::Yaml(str) => {
63 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
64
65 ensure!(docs.len() == 1, YamlParseSnafu);
66
67 let doc = &docs[0];
68
69 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
70
71 let doc_version = (&doc[DOC_VERSION]).try_into()?;
72
73 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
74 v.try_into()?
75 } else {
76 Processors::default()
77 };
78
79 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
80 {
81 v.try_into()?
82 } else {
83 Transforms::default()
84 };
85
86 let transformer = if transformers.is_empty() {
87 let cnt = processors
90 .iter()
91 .filter_map(|p| match p {
92 ProcessorKind::Date(d) if !d.ignore_missing() => Some(
93 d.fields
94 .iter()
95 .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
96 .collect_vec(),
97 ),
98 ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
99 e.fields
100 .iter()
101 .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
102 .collect_vec(),
103 ),
104 _ => None,
105 })
106 .flatten()
107 .collect_vec();
108 ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
109
110 let (ts_name, timeunit) = cnt.first().unwrap();
111 TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
112 } else {
113 TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
114 transformers,
115 &doc_version,
116 )?)
117 };
118
119 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
120 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
121 } else {
122 None
123 };
124
125 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
126 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
127 } else {
128 None
129 };
130
131 Ok(Pipeline {
132 doc_version,
133 description,
134 processors,
135 transformer,
136 dispatcher,
137 tablesuffix,
138 })
139 }
140 Content::Json(_) => unimplemented!(),
141 }
142}
143
144#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
145pub enum PipelineDocVersion {
146 #[default]
149 V1,
150
151 V2,
158}
159
160impl TryFrom<&Yaml> for PipelineDocVersion {
161 type Error = Error;
162
163 fn try_from(value: &Yaml) -> Result<Self> {
164 if value.is_badvalue() || value.is_null() {
165 return Ok(PipelineDocVersion::V1);
166 }
167
168 let version = match value {
169 Yaml::String(s) => s
170 .parse::<i64>()
171 .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
172 Yaml::Integer(i) => *i,
173 _ => {
174 return InvalidVersionNumberSnafu {
175 version: value.as_str().unwrap_or_default().to_string(),
176 }
177 .fail();
178 }
179 };
180
181 match version {
182 1 => Ok(PipelineDocVersion::V1),
183 2 => Ok(PipelineDocVersion::V2),
184 _ => InvalidVersionNumberSnafu {
185 version: version.to_string(),
186 }
187 .fail(),
188 }
189 }
190}
191
192#[derive(Debug)]
193pub struct Pipeline {
194 doc_version: PipelineDocVersion,
195 description: Option<String>,
196 processors: processor::Processors,
197 dispatcher: Option<Dispatcher>,
198 transformer: TransformerMode,
199 tablesuffix: Option<TableSuffixTemplate>,
200}
201
202#[derive(Debug, Clone)]
203pub enum TransformerMode {
204 GreptimeTransformer(GreptimeTransformer),
205 AutoTransform(String, TimeUnit),
206}
207
208#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
210pub struct DispatchedTo {
211 pub table_suffix: String,
212 pub pipeline: Option<String>,
213}
214
215impl From<&Rule> for DispatchedTo {
216 fn from(value: &Rule) -> Self {
217 DispatchedTo {
218 table_suffix: value.table_suffix.clone(),
219 pipeline: value.pipeline.clone(),
220 }
221 }
222}
223
224impl DispatchedTo {
225 pub fn dispatched_to_table_name(&self, original: &str) -> String {
227 [original, &self.table_suffix].concat()
228 }
229}
230
231#[derive(Debug)]
233pub enum PipelineExecOutput {
234 Transformed(TransformedOutput),
235 DispatchedTo(DispatchedTo, VrlValue),
236 Filtered,
237}
238
239#[derive(Debug)]
246pub struct TransformedOutput {
247 pub rows_by_context: HashMap<ContextOpt, Vec<RowWithTableSuffix>>,
249}
250
251impl PipelineExecOutput {
252 pub fn into_transformed(self) -> Option<Vec<RowWithTableSuffix>> {
254 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
255 Some(rows_by_context.into_values().flatten().collect())
257 } else {
258 None
259 }
260 }
261
262 pub fn into_transformed_hashmap(self) -> Option<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
264 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
265 Some(rows_by_context)
266 } else {
267 None
268 }
269 }
270
271 pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec<RowWithTableSuffix>)> {
274 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
275 if rows_by_context.len() == 1 {
276 let (opt, rows) = rows_by_context.into_iter().next().unwrap();
277 Some((opt, rows))
278 } else {
279 let all_rows: Vec<RowWithTableSuffix> =
281 rows_by_context.into_values().flatten().collect();
282 Some((ContextOpt::default(), all_rows))
283 }
284 } else {
285 None
286 }
287 }
288
289 pub fn into_dispatched(self) -> Option<DispatchedTo> {
291 if let Self::DispatchedTo(d, _) = self {
292 Some(d)
293 } else {
294 None
295 }
296 }
297}
298
299impl Pipeline {
300 fn is_v1(&self) -> bool {
301 self.doc_version == PipelineDocVersion::V1
302 }
303
304 pub fn exec_mut(
305 &self,
306 mut val: VrlValue,
307 pipeline_ctx: &PipelineContext<'_>,
308 schema_info: &mut SchemaInfo,
309 ) -> Result<PipelineExecOutput> {
310 for processor in self.processors.iter() {
312 val = processor.exec_mut(val)?;
313 if val.is_null() {
314 return Ok(PipelineExecOutput::Filtered);
316 }
317 }
318
319 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
321 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
322 }
323
324 let mut val = if val.is_array() {
325 val
326 } else {
327 VrlValue::Array(vec![val])
328 };
329
330 let rows_by_context = match self.transformer() {
331 TransformerMode::GreptimeTransformer(greptime_transformer) => {
332 transform_array_elements_by_ctx(
333 val.as_array_mut().unwrap(),
335 greptime_transformer,
336 self.is_v1(),
337 schema_info,
338 pipeline_ctx,
339 self.tablesuffix.as_ref(),
340 )?
341 }
342 TransformerMode::AutoTransform(ts_name, time_unit) => {
343 let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
344 IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false),
345 ));
346 let n_ctx =
347 PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
348 values_to_rows(
349 schema_info,
350 val,
351 &n_ctx,
352 None,
353 true,
354 self.tablesuffix.as_ref(),
355 )?
356 }
357 };
358
359 Ok(PipelineExecOutput::Transformed(TransformedOutput {
360 rows_by_context,
361 }))
362 }
363
364 pub fn processors(&self) -> &processor::Processors {
365 &self.processors
366 }
367
368 pub fn transformer(&self) -> &TransformerMode {
369 &self.transformer
370 }
371
372 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
374 match &self.transformer {
375 TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
376 TransformerMode::AutoTransform(_, _) => None,
377 }
378 }
379
380 pub fn is_variant_table_name(&self) -> bool {
381 self.dispatcher.is_some() || self.tablesuffix.is_some()
384 }
385}
386
387fn transform_array_elements_by_ctx(
390 arr: &mut [VrlValue],
391 transformer: &GreptimeTransformer,
392 is_v1: bool,
393 schema_info: &mut SchemaInfo,
394 pipeline_ctx: &PipelineContext<'_>,
395 tablesuffix_template: Option<&TableSuffixTemplate>,
396) -> Result<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
397 let skip_error = pipeline_ctx.pipeline_param.skip_error();
398 let mut rows_by_context = HashMap::new();
399
400 for (index, element) in arr.iter_mut().enumerate() {
401 if !element.is_object() {
402 unwrap_or_continue_if_err!(
403 ArrayElementMustBeObjectSnafu {
404 index,
405 actual_type: element.kind_str().to_string(),
406 }
407 .fail(),
408 skip_error
409 );
410 }
411
412 let values =
413 unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error);
414 if is_v1 {
415 let mut opt = unwrap_or_continue_if_err!(
417 ContextOpt::from_pipeline_map_to_opt(element),
418 skip_error
419 );
420 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element);
421 rows_by_context
422 .entry(opt)
423 .or_insert_with(Vec::new)
424 .push((Row { values }, table_suffix));
425 } else {
426 let element_rows_map = values_to_rows(
428 schema_info,
429 element.clone(),
430 pipeline_ctx,
431 Some(values),
432 false,
433 tablesuffix_template,
434 )
435 .map_err(Box::new)
436 .context(TransformArrayElementSnafu { index })?;
437 for (k, v) in element_rows_map {
438 rows_by_context.entry(k).or_default().extend(v);
439 }
440 }
441 }
442
443 Ok(rows_by_context)
444}
445
446pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
447 intermediate_keys
448 .iter()
449 .position(|k| k == key)
450 .context(IntermediateKeyIndexSnafu { kind, key })
451}
452
453#[macro_export]
462macro_rules! setup_pipeline {
463 ($pipeline:expr) => {{
464 use std::sync::Arc;
465
466 use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
467
468 let pipeline: Arc<Pipeline> = Arc::new($pipeline);
469 let schema = pipeline.schemas().unwrap();
470 let schema_info = SchemaInfo::from_schema_list(schema.clone());
471
472 let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
473 let pipeline_param = GreptimePipelineParams::default();
474
475 (pipeline, schema_info, pipeline_def, pipeline_param)
476 }};
477}
478
479#[cfg(test)]
480mod tests {
481 use std::collections::BTreeMap;
482 use std::sync::Arc;
483
484 use api::v1::Rows;
485 use greptime_proto::v1::value::ValueData;
486 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
487 use vrl::prelude::Bytes;
488 use vrl::value::KeyString;
489
490 use super::*;
491
492 #[test]
493 fn test_pipeline_prepare() {
494 let input_value_str = r#"
495 {
496 "my_field": "1,2",
497 "foo": "bar",
498 "ts": "1"
499 }
500 "#;
501 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
502
503 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
504processors:
505 - csv:
506 field: my_field
507 target_fields: field1, field2
508 - epoch:
509 field: ts
510 resolution: ns
511transform:
512 - field: field1
513 type: uint32
514 - field: field2
515 type: uint32
516 - field: ts
517 type: timestamp, ns
518 index: time
519 "#;
520
521 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
522 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
523 let pipeline_ctx = PipelineContext::new(
524 &pipeline_def,
525 &pipeline_param,
526 session::context::Channel::Unknown,
527 );
528
529 let payload = input_value.into();
530 let mut result = pipeline
531 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
532 .unwrap()
533 .into_transformed()
534 .unwrap();
535
536 let (row, _table_suffix) = result.swap_remove(0);
537 assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1)));
538 assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2)));
539 match &row.values[2].value_data {
540 Some(ValueData::TimestampNanosecondValue(v)) => {
541 assert_ne!(v, &0);
542 }
543 _ => panic!("expect null value"),
544 }
545 }
546
547 #[test]
548 fn test_dissect_pipeline() {
549 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
550 let pipeline_str = r#"processors:
551 - dissect:
552 fields:
553 - message
554 patterns:
555 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
556 - date:
557 fields:
558 - ts
559 formats:
560 - "%d/%b/%Y:%H:%M:%S %z"
561
562transform:
563 - fields:
564 - ip
565 - username
566 - method
567 - path
568 - proto
569 type: string
570 - fields:
571 - status
572 type: uint16
573 - fields:
574 - bytes
575 type: uint32
576 - field: ts
577 type: timestamp, ns
578 index: time"#;
579 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
580 let pipeline = Arc::new(pipeline);
581 let schema = pipeline.schemas().unwrap();
582 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
583
584 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
585 let pipeline_param = crate::GreptimePipelineParams::default();
586 let pipeline_ctx = PipelineContext::new(
587 &pipeline_def,
588 &pipeline_param,
589 session::context::Channel::Unknown,
590 );
591 let payload = VrlValue::Object(BTreeMap::from([(
592 KeyString::from("message"),
593 VrlValue::Bytes(Bytes::from(message)),
594 )]));
595
596 let result = pipeline
597 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
598 .unwrap()
599 .into_transformed()
600 .unwrap();
601
602 assert_eq!(schema_info.schema.len(), result[0].0.values.len());
603 let test = [
604 (
605 ColumnDataType::String as i32,
606 Some(ValueData::StringValue("129.37.245.88".into())),
607 ),
608 (
609 ColumnDataType::String as i32,
610 Some(ValueData::StringValue("meln1ks".into())),
611 ),
612 (
613 ColumnDataType::String as i32,
614 Some(ValueData::StringValue("PATCH".into())),
615 ),
616 (
617 ColumnDataType::String as i32,
618 Some(ValueData::StringValue(
619 "/observability/metrics/production".into(),
620 )),
621 ),
622 (
623 ColumnDataType::String as i32,
624 Some(ValueData::StringValue("HTTP/1.0".into())),
625 ),
626 (
627 ColumnDataType::Uint16 as i32,
628 Some(ValueData::U16Value(501)),
629 ),
630 (
631 ColumnDataType::Uint32 as i32,
632 Some(ValueData::U32Value(33085)),
633 ),
634 (
635 ColumnDataType::TimestampNanosecond as i32,
636 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
637 ),
638 ];
639 let schema = pipeline.schemas().unwrap();
641 for i in 0..schema.len() {
642 let schema = &schema[i];
643 let value = &result[0].0.values[i];
644 assert_eq!(schema.datatype, test[i].0);
645 assert_eq!(value.value_data, test[i].1);
646 }
647 }
648
649 #[test]
650 fn test_csv_pipeline() {
651 let input_value_str = r#"
652 {
653 "my_field": "1,2",
654 "foo": "bar",
655 "ts": "1"
656 }
657 "#;
658 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
659
660 let pipeline_yaml = r#"
661 description: Pipeline for Apache Tomcat
662 processors:
663 - csv:
664 field: my_field
665 target_fields: field1, field2
666 - epoch:
667 field: ts
668 resolution: ns
669 transform:
670 - field: field1
671 type: uint32
672 - field: field2
673 type: uint32
674 - field: ts
675 type: timestamp, ns
676 index: time
677 "#;
678
679 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
680 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
681 let pipeline_ctx = PipelineContext::new(
682 &pipeline_def,
683 &pipeline_param,
684 session::context::Channel::Unknown,
685 );
686
687 let payload = input_value.into();
688 let result = pipeline
689 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
690 .unwrap()
691 .into_transformed()
692 .unwrap();
693 assert_eq!(
694 result[0].0.values[0].value_data,
695 Some(ValueData::U32Value(1))
696 );
697 assert_eq!(
698 result[0].0.values[1].value_data,
699 Some(ValueData::U32Value(2))
700 );
701 match &result[0].0.values[2].value_data {
702 Some(ValueData::TimestampNanosecondValue(v)) => {
703 assert_ne!(v, &0);
704 }
705 _ => panic!("expect null value"),
706 }
707 }
708
709 #[test]
710 fn test_date_pipeline() {
711 let input_value_str = r#"
712 {
713 "my_field": "1,2",
714 "foo": "bar",
715 "test_time": "2014-5-17T04:34:56+00:00"
716 }
717 "#;
718 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
719
720 let pipeline_yaml = r#"---
721description: Pipeline for Apache Tomcat
722
723processors:
724 - date:
725 field: test_time
726
727transform:
728 - field: test_time
729 type: timestamp, ns
730 index: time
731 "#;
732
733 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
734 let pipeline = Arc::new(pipeline);
735 let schema = pipeline.schemas().unwrap();
736 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
737
738 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
739 let pipeline_param = crate::GreptimePipelineParams::default();
740 let pipeline_ctx = PipelineContext::new(
741 &pipeline_def,
742 &pipeline_param,
743 session::context::Channel::Unknown,
744 );
745 let schema = pipeline.schemas().unwrap().clone();
746 let result = input_value.into();
747
748 let rows_with_suffix = pipeline
749 .exec_mut(result, &pipeline_ctx, &mut schema_info)
750 .unwrap()
751 .into_transformed()
752 .unwrap();
753 let output = Rows {
754 schema,
755 rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
756 };
757 let schemas = output.schema;
758
759 assert_eq!(schemas.len(), 1);
760 let schema = schemas[0].clone();
761 assert_eq!("test_time", schema.column_name);
762 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
763 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
764
765 let row = output.rows[0].clone();
766 assert_eq!(1, row.values.len());
767 let value_data = row.values[0].clone().value_data;
768 assert_eq!(
769 Some(v1::value::ValueData::TimestampNanosecondValue(
770 1400301296000000000
771 )),
772 value_data
773 );
774 }
775
776 #[test]
777 fn test_dispatcher() {
778 let pipeline_yaml = r#"
779---
780description: Pipeline for Apache Tomcat
781
782processors:
783 - epoch:
784 field: ts
785 resolution: ns
786
787dispatcher:
788 field: typename
789 rules:
790 - value: http
791 table_suffix: http_events
792 - value: database
793 table_suffix: db_events
794 pipeline: database_pipeline
795
796transform:
797 - field: typename
798 type: string
799 - field: ts
800 type: timestamp, ns
801 index: time
802"#;
803 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
804 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
805 assert_eq!(dispatcher.field, "typename");
806
807 assert_eq!(dispatcher.rules.len(), 2);
808
809 assert_eq!(
810 dispatcher.rules[0],
811 crate::dispatcher::Rule {
812 value: VrlValue::Bytes(Bytes::from("http")),
813 table_suffix: "http_events".to_string(),
814 pipeline: None
815 }
816 );
817
818 assert_eq!(
819 dispatcher.rules[1],
820 crate::dispatcher::Rule {
821 value: VrlValue::Bytes(Bytes::from("database")),
822 table_suffix: "db_events".to_string(),
823 pipeline: Some("database_pipeline".to_string()),
824 }
825 );
826
827 let bad_yaml1 = r#"
828---
829description: Pipeline for Apache Tomcat
830
831processors:
832 - epoch:
833 field: ts
834 resolution: ns
835
836dispatcher:
837 _field: typename
838 rules:
839 - value: http
840 table_suffix: http_events
841 - value: database
842 table_suffix: db_events
843 pipeline: database_pipeline
844
845transform:
846 - field: typename
847 type: string
848 - field: ts
849 type: timestamp, ns
850 index: time
851"#;
852 let bad_yaml2 = r#"
853---
854description: Pipeline for Apache Tomcat
855
856processors:
857 - epoch:
858 field: ts
859 resolution: ns
860dispatcher:
861 field: typename
862 rules:
863 - value: http
864 _table_suffix: http_events
865 - value: database
866 _table_suffix: db_events
867 pipeline: database_pipeline
868
869transform:
870 - field: typename
871 type: string
872 - field: ts
873 type: timestamp, ns
874 index: time
875"#;
876 let bad_yaml3 = r#"
877---
878description: Pipeline for Apache Tomcat
879
880processors:
881 - epoch:
882 field: ts
883 resolution: ns
884dispatcher:
885 field: typename
886 rules:
887 - _value: http
888 table_suffix: http_events
889 - _value: database
890 table_suffix: db_events
891 pipeline: database_pipeline
892
893transform:
894 - field: typename
895 type: string
896 - field: ts
897 type: timestamp, ns
898 index: time
899"#;
900
901 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
902 assert!(r.is_err());
903 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
904 assert!(r.is_err());
905 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
906 assert!(r.is_err());
907 }
908
909 #[test]
912 fn test_one_to_many_vrl_expansion() {
913 let pipeline_yaml = r#"
914processors:
915 - epoch:
916 field: timestamp
917 resolution: ms
918 - vrl:
919 source: |
920 events = del(.events)
921 base_host = del(.host)
922 base_ts = del(.timestamp)
923 map_values(array!(events)) -> |event| {
924 {
925 "host": base_host,
926 "event_type": event.type,
927 "event_value": event.value,
928 "timestamp": base_ts
929 }
930 }
931
932transform:
933 - field: host
934 type: string
935 - field: event_type
936 type: string
937 - field: event_value
938 type: int32
939 - field: timestamp
940 type: timestamp, ms
941 index: time
942"#;
943
944 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
945 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
946 let pipeline_ctx = PipelineContext::new(
947 &pipeline_def,
948 &pipeline_param,
949 session::context::Channel::Unknown,
950 );
951
952 let input_value: serde_json::Value = serde_json::from_str(
954 r#"{
955 "host": "server1",
956 "timestamp": 1716668197217,
957 "events": [
958 {"type": "cpu", "value": 80},
959 {"type": "memory", "value": 60},
960 {"type": "disk", "value": 45}
961 ]
962 }"#,
963 )
964 .unwrap();
965
966 let payload = input_value.into();
967 let result = pipeline
968 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
969 .unwrap()
970 .into_transformed()
971 .unwrap();
972
973 assert_eq!(result.len(), 3);
975
976 for (row, _table_suffix) in &result {
978 assert_eq!(row.values.len(), 4); assert_eq!(
981 row.values[0].value_data,
982 Some(ValueData::StringValue("server1".to_string()))
983 );
984 assert_eq!(
986 row.values[3].value_data,
987 Some(ValueData::TimestampMillisecondValue(1716668197217))
988 );
989 }
990
991 let event_types: Vec<_> = result
993 .iter()
994 .map(|(r, _)| match &r.values[1].value_data {
995 Some(ValueData::StringValue(s)) => s.clone(),
996 _ => panic!("expected string"),
997 })
998 .collect();
999 assert!(event_types.contains(&"cpu".to_string()));
1000 assert!(event_types.contains(&"memory".to_string()));
1001 assert!(event_types.contains(&"disk".to_string()));
1002 }
1003
1004 #[test]
1006 fn test_single_object_output_unchanged() {
1007 let pipeline_yaml = r#"
1008processors:
1009 - epoch:
1010 field: ts
1011 resolution: ms
1012 - vrl:
1013 source: |
1014 .processed = true
1015 .
1016
1017transform:
1018 - field: name
1019 type: string
1020 - field: processed
1021 type: boolean
1022 - field: ts
1023 type: timestamp, ms
1024 index: time
1025"#;
1026
1027 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1028 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1029 let pipeline_ctx = PipelineContext::new(
1030 &pipeline_def,
1031 &pipeline_param,
1032 session::context::Channel::Unknown,
1033 );
1034
1035 let input_value: serde_json::Value = serde_json::from_str(
1036 r#"{
1037 "name": "test",
1038 "ts": 1716668197217
1039 }"#,
1040 )
1041 .unwrap();
1042
1043 let payload = input_value.into();
1044 let result = pipeline
1045 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1046 .unwrap()
1047 .into_transformed()
1048 .unwrap();
1049
1050 assert_eq!(result.len(), 1);
1052 assert_eq!(
1053 result[0].0.values[0].value_data,
1054 Some(ValueData::StringValue("test".to_string()))
1055 );
1056 assert_eq!(
1057 result[0].0.values[1].value_data,
1058 Some(ValueData::BoolValue(true))
1059 );
1060 }
1061
1062 #[test]
1064 fn test_empty_array_produces_zero_rows() {
1065 let pipeline_yaml = r#"
1066processors:
1067 - vrl:
1068 source: |
1069 .events
1070
1071transform:
1072 - field: value
1073 type: int32
1074 - field: greptime_timestamp
1075 type: timestamp, ns
1076 index: time
1077"#;
1078
1079 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1080 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1081 let pipeline_ctx = PipelineContext::new(
1082 &pipeline_def,
1083 &pipeline_param,
1084 session::context::Channel::Unknown,
1085 );
1086
1087 let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1088
1089 let payload = input_value.into();
1090 let result = pipeline
1091 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1092 .unwrap()
1093 .into_transformed()
1094 .unwrap();
1095
1096 assert_eq!(result.len(), 0);
1098 }
1099
1100 #[test]
1102 fn test_array_element_must_be_object() {
1103 let pipeline_yaml = r#"
1104processors:
1105 - vrl:
1106 source: |
1107 .items
1108
1109transform:
1110 - field: value
1111 type: int32
1112 - field: greptime_timestamp
1113 type: timestamp, ns
1114 index: time
1115"#;
1116
1117 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1118 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1119 let pipeline_ctx = PipelineContext::new(
1120 &pipeline_def,
1121 &pipeline_param,
1122 session::context::Channel::Unknown,
1123 );
1124
1125 let input_value: serde_json::Value =
1127 serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap();
1128
1129 let payload = input_value.into();
1130 let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info);
1131
1132 assert!(result.is_err());
1133 let err_msg = result.unwrap_err().to_string();
1134 assert!(
1135 err_msg.contains("must be an object"),
1136 "Expected error about non-object element, got: {}",
1137 err_msg
1138 );
1139 }
1140
1141 #[test]
1143 fn test_one_to_many_with_table_suffix_hint() {
1144 let pipeline_yaml = r#"
1145processors:
1146 - epoch:
1147 field: ts
1148 resolution: ms
1149 - vrl:
1150 source: |
1151 .greptime_table_suffix = "_" + string!(.category)
1152 .
1153
1154transform:
1155 - field: name
1156 type: string
1157 - field: category
1158 type: string
1159 - field: ts
1160 type: timestamp, ms
1161 index: time
1162"#;
1163
1164 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1165 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1166 let pipeline_ctx = PipelineContext::new(
1167 &pipeline_def,
1168 &pipeline_param,
1169 session::context::Channel::Unknown,
1170 );
1171
1172 let input_value: serde_json::Value = serde_json::from_str(
1173 r#"{
1174 "name": "test",
1175 "category": "metrics",
1176 "ts": 1716668197217
1177 }"#,
1178 )
1179 .unwrap();
1180
1181 let payload = input_value.into();
1182 let result = pipeline
1183 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1184 .unwrap()
1185 .into_transformed()
1186 .unwrap();
1187
1188 assert_eq!(result.len(), 1);
1190 assert_eq!(result[0].1, Some("_metrics".to_string()));
1191 }
1192
1193 #[test]
1195 fn test_one_to_many_per_row_table_suffix() {
1196 let pipeline_yaml = r#"
1197processors:
1198 - epoch:
1199 field: timestamp
1200 resolution: ms
1201 - vrl:
1202 source: |
1203 events = del(.events)
1204 base_ts = del(.timestamp)
1205
1206 map_values(array!(events)) -> |event| {
1207 suffix = "_" + string!(event.category)
1208 {
1209 "name": event.name,
1210 "value": event.value,
1211 "timestamp": base_ts,
1212 "greptime_table_suffix": suffix
1213 }
1214 }
1215
1216transform:
1217 - field: name
1218 type: string
1219 - field: value
1220 type: int32
1221 - field: timestamp
1222 type: timestamp, ms
1223 index: time
1224"#;
1225
1226 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1227 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1228 let pipeline_ctx = PipelineContext::new(
1229 &pipeline_def,
1230 &pipeline_param,
1231 session::context::Channel::Unknown,
1232 );
1233
1234 let input_value: serde_json::Value = serde_json::from_str(
1236 r#"{
1237 "timestamp": 1716668197217,
1238 "events": [
1239 {"name": "cpu_usage", "value": 80, "category": "cpu"},
1240 {"name": "mem_usage", "value": 60, "category": "memory"},
1241 {"name": "cpu_temp", "value": 45, "category": "cpu"}
1242 ]
1243 }"#,
1244 )
1245 .unwrap();
1246
1247 let payload = input_value.into();
1248 let result = pipeline
1249 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1250 .unwrap()
1251 .into_transformed()
1252 .unwrap();
1253
1254 assert_eq!(result.len(), 3);
1256
1257 let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect();
1259
1260 assert!(table_suffixes.contains(&Some("_cpu".to_string())));
1262 assert!(table_suffixes.contains(&Some("_memory".to_string())));
1263
1264 let cpu_count = table_suffixes
1266 .iter()
1267 .filter(|s| *s == &Some("_cpu".to_string()))
1268 .count();
1269 let memory_count = table_suffixes
1270 .iter()
1271 .filter(|s| *s == &Some("_memory".to_string()))
1272 .count();
1273 assert_eq!(cpu_count, 2);
1274 assert_eq!(memory_count, 1);
1275 }
1276
1277 #[test]
1279 fn test_one_to_many_hashmap_contextopt_preservation() {
1280 let pipeline_yaml = r#"
1281processors:
1282 - epoch:
1283 field: timestamp
1284 resolution: ms
1285 - vrl:
1286 source: |
1287 events = del(.events)
1288 base_ts = del(.timestamp)
1289
1290 map_values(array!(events)) -> |event| {
1291 # Set different TTL values per event type
1292 ttl = if event.type == "critical" {
1293 "1h"
1294 } else if event.type == "warning" {
1295 "24h"
1296 } else {
1297 "7d"
1298 }
1299
1300 {
1301 "host": del(.host),
1302 "event_type": event.type,
1303 "event_value": event.value,
1304 "timestamp": base_ts,
1305 "greptime_ttl": ttl
1306 }
1307 }
1308
1309transform:
1310 - field: host
1311 type: string
1312 - field: event_type
1313 type: string
1314 - field: event_value
1315 type: int32
1316 - field: timestamp
1317 type: timestamp, ms
1318 index: time
1319"#;
1320
1321 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1322 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1323 let pipeline_ctx = PipelineContext::new(
1324 &pipeline_def,
1325 &pipeline_param,
1326 session::context::Channel::Unknown,
1327 );
1328
1329 let input_value: serde_json::Value = serde_json::from_str(
1331 r#"{
1332 "host": "server1",
1333 "timestamp": 1716668197217,
1334 "events": [
1335 {"type": "critical", "value": 100},
1336 {"type": "warning", "value": 50},
1337 {"type": "info", "value": 25}
1338 ]
1339 }"#,
1340 )
1341 .unwrap();
1342
1343 let payload = input_value.into();
1344 let result = pipeline
1345 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1346 .unwrap();
1347
1348 let rows_by_context = result.into_transformed_hashmap().unwrap();
1350
1351 assert_eq!(rows_by_context.len(), 3);
1353
1354 let mut context_opts = Vec::new();
1356 for (opt, rows) in &rows_by_context {
1357 assert_eq!(rows.len(), 1); context_opts.push(opt.clone());
1359 }
1360
1361 assert_ne!(context_opts[0], context_opts[1]);
1363 assert_ne!(context_opts[1], context_opts[2]);
1364 assert_ne!(context_opts[0], context_opts[2]);
1365
1366 for rows in rows_by_context.values() {
1368 for (row, _table_suffix) in rows {
1369 assert_eq!(row.values.len(), 4); }
1371 }
1372 }
1373
1374 #[test]
1376 fn test_single_object_hashmap_compatibility() {
1377 let pipeline_yaml = r#"
1378processors:
1379 - epoch:
1380 field: ts
1381 resolution: ms
1382 - vrl:
1383 source: |
1384 .processed = true
1385 .
1386
1387transform:
1388 - field: name
1389 type: string
1390 - field: processed
1391 type: boolean
1392 - field: ts
1393 type: timestamp, ms
1394 index: time
1395"#;
1396
1397 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1398 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1399 let pipeline_ctx = PipelineContext::new(
1400 &pipeline_def,
1401 &pipeline_param,
1402 session::context::Channel::Unknown,
1403 );
1404
1405 let input_value: serde_json::Value = serde_json::from_str(
1406 r#"{
1407 "name": "test",
1408 "ts": 1716668197217
1409 }"#,
1410 )
1411 .unwrap();
1412
1413 let payload = input_value.into();
1414 let result = pipeline
1415 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1416 .unwrap();
1417
1418 let rows_by_context = result.into_transformed_hashmap().unwrap();
1420
1421 assert_eq!(rows_by_context.len(), 1);
1423
1424 let (_opt, rows) = rows_by_context.into_iter().next().unwrap();
1425 assert_eq!(rows.len(), 1);
1426
1427 let (row, _table_suffix) = &rows[0];
1429 assert_eq!(row.values.len(), 3); }
1431
1432 #[test]
1434 fn test_empty_array_hashmap() {
1435 let pipeline_yaml = r#"
1436processors:
1437 - vrl:
1438 source: |
1439 .events
1440
1441transform:
1442 - field: value
1443 type: int32
1444 - field: greptime_timestamp
1445 type: timestamp, ns
1446 index: time
1447"#;
1448
1449 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1450 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1451 let pipeline_ctx = PipelineContext::new(
1452 &pipeline_def,
1453 &pipeline_param,
1454 session::context::Channel::Unknown,
1455 );
1456
1457 let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1458
1459 let payload = input_value.into();
1460 let result = pipeline
1461 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1462 .unwrap();
1463
1464 let rows_by_context = result.into_transformed_hashmap().unwrap();
1466
1467 assert_eq!(rows_by_context.len(), 0);
1469 }
1470}