1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::HashMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{OptionExt, ResultExt, ensure};
29use transform::Transforms;
30use vrl::core::Value as VrlValue;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35 ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error,
36 IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu,
37 YamlLoadSnafu, YamlParseSnafu,
38};
39use crate::etl::processor::ProcessorKind;
40use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows};
41use crate::tablesuffix::TableSuffixTemplate;
42use crate::{
43 ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo,
44 unwrap_or_continue_if_err,
45};
46
47const DESCRIPTION: &str = "description";
48const DOC_VERSION: &str = "version";
49const PROCESSORS: &str = "processors";
50const TRANSFORM: &str = "transform";
51const TRANSFORMS: &str = "transforms";
52const DISPATCHER: &str = "dispatcher";
53const TABLESUFFIX: &str = "table_suffix";
54
55pub enum Content<'a> {
56 Json(&'a str),
57 Yaml(&'a str),
58}
59
60pub fn parse(input: &Content) -> Result<Pipeline> {
61 match input {
62 Content::Yaml(str) => {
63 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
64
65 ensure!(docs.len() == 1, YamlParseSnafu);
66
67 let doc = &docs[0];
68
69 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
70
71 let doc_version = (&doc[DOC_VERSION]).try_into()?;
72
73 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
74 v.try_into()?
75 } else {
76 Processors::default()
77 };
78
79 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
80 {
81 v.try_into()?
82 } else {
83 Transforms::default()
84 };
85
86 let transformer = if transformers.is_empty() {
87 let cnt = processors
90 .iter()
91 .filter_map(|p| match p {
92 ProcessorKind::Date(d) if !d.ignore_missing() => Some(
93 d.fields
94 .iter()
95 .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
96 .collect_vec(),
97 ),
98 ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
99 e.fields
100 .iter()
101 .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
102 .collect_vec(),
103 ),
104 _ => None,
105 })
106 .flatten()
107 .collect_vec();
108 ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
109
110 let (ts_name, timeunit) = cnt.first().unwrap();
111 TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
112 } else {
113 TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
114 transformers,
115 &doc_version,
116 )?)
117 };
118
119 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
120 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
121 } else {
122 None
123 };
124
125 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
126 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
127 } else {
128 None
129 };
130
131 Ok(Pipeline {
132 doc_version,
133 description,
134 processors,
135 transformer,
136 dispatcher,
137 tablesuffix,
138 })
139 }
140 Content::Json(_) => unimplemented!(),
141 }
142}
143
144#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
145pub enum PipelineDocVersion {
146 #[default]
149 V1,
150
151 V2,
158}
159
160impl TryFrom<&Yaml> for PipelineDocVersion {
161 type Error = Error;
162
163 fn try_from(value: &Yaml) -> Result<Self> {
164 if value.is_badvalue() || value.is_null() {
165 return Ok(PipelineDocVersion::V1);
166 }
167
168 let version = match value {
169 Yaml::String(s) => s
170 .parse::<i64>()
171 .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
172 Yaml::Integer(i) => *i,
173 _ => {
174 return InvalidVersionNumberSnafu {
175 version: value.as_str().unwrap_or_default().to_string(),
176 }
177 .fail();
178 }
179 };
180
181 match version {
182 1 => Ok(PipelineDocVersion::V1),
183 2 => Ok(PipelineDocVersion::V2),
184 _ => InvalidVersionNumberSnafu {
185 version: version.to_string(),
186 }
187 .fail(),
188 }
189 }
190}
191
192#[derive(Debug)]
193pub struct Pipeline {
194 doc_version: PipelineDocVersion,
195 description: Option<String>,
196 processors: processor::Processors,
197 dispatcher: Option<Dispatcher>,
198 transformer: TransformerMode,
199 tablesuffix: Option<TableSuffixTemplate>,
200}
201
202#[derive(Debug, Clone)]
203pub enum TransformerMode {
204 GreptimeTransformer(GreptimeTransformer),
205 AutoTransform(String, TimeUnit),
206}
207
208#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
210pub struct DispatchedTo {
211 pub table_suffix: String,
212 pub pipeline: Option<String>,
213}
214
215impl From<&Rule> for DispatchedTo {
216 fn from(value: &Rule) -> Self {
217 DispatchedTo {
218 table_suffix: value.table_suffix.clone(),
219 pipeline: value.pipeline.clone(),
220 }
221 }
222}
223
224impl DispatchedTo {
225 pub fn dispatched_to_table_name(&self, original: &str) -> String {
227 [original, &self.table_suffix].concat()
228 }
229}
230
231#[derive(Debug)]
233pub enum PipelineExecOutput {
234 Transformed(TransformedOutput),
235 DispatchedTo(DispatchedTo, VrlValue),
236 Filtered,
237}
238
239#[derive(Debug)]
246pub struct TransformedOutput {
247 pub rows_by_context: HashMap<ContextOpt, Vec<RowWithTableSuffix>>,
249}
250
251impl PipelineExecOutput {
252 pub fn into_transformed(self) -> Option<Vec<RowWithTableSuffix>> {
254 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
255 Some(rows_by_context.into_values().flatten().collect())
257 } else {
258 None
259 }
260 }
261
262 pub fn into_transformed_hashmap(self) -> Option<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
264 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
265 Some(rows_by_context)
266 } else {
267 None
268 }
269 }
270
271 pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec<RowWithTableSuffix>)> {
274 if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
275 if rows_by_context.len() == 1 {
276 let (opt, rows) = rows_by_context.into_iter().next().unwrap();
277 Some((opt, rows))
278 } else {
279 let all_rows: Vec<RowWithTableSuffix> =
281 rows_by_context.into_values().flatten().collect();
282 Some((ContextOpt::default(), all_rows))
283 }
284 } else {
285 None
286 }
287 }
288
289 pub fn into_dispatched(self) -> Option<DispatchedTo> {
291 if let Self::DispatchedTo(d, _) = self {
292 Some(d)
293 } else {
294 None
295 }
296 }
297}
298
299impl Pipeline {
300 fn is_v1(&self) -> bool {
301 self.doc_version == PipelineDocVersion::V1
302 }
303
304 pub fn exec_mut(
305 &self,
306 mut val: VrlValue,
307 pipeline_ctx: &PipelineContext<'_>,
308 schema_info: &mut SchemaInfo,
309 ) -> Result<PipelineExecOutput> {
310 for processor in self.processors.iter() {
312 val = processor.exec_mut(val)?;
313 if val.is_null() {
314 return Ok(PipelineExecOutput::Filtered);
316 }
317 }
318
319 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
321 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
322 }
323
324 let mut val = if val.is_array() {
325 val
326 } else {
327 VrlValue::Array(vec![val])
328 };
329
330 let rows_by_context = match self.transformer() {
331 TransformerMode::GreptimeTransformer(greptime_transformer) => {
332 transform_array_elements_by_ctx(
333 val.as_array_mut().unwrap(),
335 greptime_transformer,
336 self.is_v1(),
337 schema_info,
338 pipeline_ctx,
339 self.tablesuffix.as_ref(),
340 )?
341 }
342 TransformerMode::AutoTransform(ts_name, time_unit) => {
343 let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
344 IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false),
345 ));
346 let n_ctx =
347 PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
348 values_to_rows(
349 schema_info,
350 val,
351 &n_ctx,
352 None,
353 true,
354 self.tablesuffix.as_ref(),
355 )?
356 }
357 };
358
359 Ok(PipelineExecOutput::Transformed(TransformedOutput {
360 rows_by_context,
361 }))
362 }
363
364 pub fn processors(&self) -> &processor::Processors {
365 &self.processors
366 }
367
368 pub fn transformer(&self) -> &TransformerMode {
369 &self.transformer
370 }
371
372 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
374 match &self.transformer {
375 TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
376 TransformerMode::AutoTransform(_, _) => None,
377 }
378 }
379
380 pub fn is_variant_table_name(&self) -> bool {
381 self.dispatcher.is_some() || self.tablesuffix.is_some()
384 }
385}
386
387fn transform_array_elements_by_ctx(
390 arr: &mut [VrlValue],
391 transformer: &GreptimeTransformer,
392 is_v1: bool,
393 schema_info: &mut SchemaInfo,
394 pipeline_ctx: &PipelineContext<'_>,
395 tablesuffix_template: Option<&TableSuffixTemplate>,
396) -> Result<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
397 let skip_error = pipeline_ctx.pipeline_param.skip_error();
398 let mut rows_by_context = HashMap::new();
399
400 for (index, element) in arr.iter_mut().enumerate() {
401 if !element.is_object() {
402 unwrap_or_continue_if_err!(
403 ArrayElementMustBeObjectSnafu {
404 index,
405 actual_type: element.kind_str().to_string(),
406 }
407 .fail(),
408 skip_error
409 );
410 }
411
412 let values =
413 unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error);
414 if is_v1 {
415 let mut opt = unwrap_or_continue_if_err!(
417 ContextOpt::from_pipeline_map_to_opt(element),
418 skip_error
419 );
420 let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element);
421 rows_by_context
422 .entry(opt)
423 .or_insert_with(Vec::new)
424 .push((Row { values }, table_suffix));
425 } else {
426 let element_rows_map = values_to_rows(
428 schema_info,
429 element.clone(),
430 pipeline_ctx,
431 Some(values),
432 false,
433 tablesuffix_template,
434 )
435 .map_err(Box::new)
436 .context(TransformArrayElementSnafu { index })?;
437 for (k, v) in element_rows_map {
438 rows_by_context.entry(k).or_default().extend(v);
439 }
440 }
441 }
442
443 Ok(rows_by_context)
444}
445
446pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
447 intermediate_keys
448 .iter()
449 .position(|k| k == key)
450 .context(IntermediateKeyIndexSnafu { kind, key })
451}
452
453#[macro_export]
462macro_rules! setup_pipeline {
463 ($pipeline:expr) => {{
464 use std::sync::Arc;
465
466 use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
467
468 let pipeline: Arc<Pipeline> = Arc::new($pipeline);
469 let schema = pipeline.schemas().unwrap();
470 let schema_info = SchemaInfo::from_schema_list(schema.clone());
471
472 let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
473 let pipeline_param = GreptimePipelineParams::default();
474
475 (pipeline, schema_info, pipeline_def, pipeline_param)
476 }};
477}
478
479#[cfg(test)]
480mod tests {
481 use std::collections::BTreeMap;
482 use std::sync::Arc;
483
484 use api::v1::Rows;
485 use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
486 use greptime_proto::v1::value::ValueData;
487 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
488 use vrl::prelude::Bytes;
489 use vrl::value::KeyString;
490
491 use super::*;
492
493 #[test]
494 fn test_pipeline_prepare() {
495 let input_value_str = r#"
496 {
497 "my_field": "1,2",
498 "foo": "bar",
499 "ts": "1"
500 }
501 "#;
502 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
503
504 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
505processors:
506 - csv:
507 field: my_field
508 target_fields: field1, field2
509 - epoch:
510 field: ts
511 resolution: ns
512transform:
513 - field: field1
514 type: uint32
515 - field: field2
516 type: uint32
517 - field: ts
518 type: timestamp, ns
519 index: time
520 "#;
521
522 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
523 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
524 let pipeline_ctx = PipelineContext::new(
525 &pipeline_def,
526 &pipeline_param,
527 session::context::Channel::Unknown,
528 );
529
530 let payload = input_value.into();
531 let mut result = pipeline
532 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
533 .unwrap()
534 .into_transformed()
535 .unwrap();
536
537 let (row, _table_suffix) = result.swap_remove(0);
538 assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1)));
539 assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2)));
540 match &row.values[2].value_data {
541 Some(ValueData::TimestampNanosecondValue(v)) => {
542 assert_ne!(v, &0);
543 }
544 _ => panic!("expect null value"),
545 }
546 }
547
548 #[test]
549 fn test_dissect_pipeline() {
550 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
551 let pipeline_str = r#"processors:
552 - dissect:
553 fields:
554 - message
555 patterns:
556 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
557 - date:
558 fields:
559 - ts
560 formats:
561 - "%d/%b/%Y:%H:%M:%S %z"
562
563transform:
564 - fields:
565 - ip
566 - username
567 - method
568 - path
569 - proto
570 type: string
571 - fields:
572 - status
573 type: uint16
574 - fields:
575 - bytes
576 type: uint32
577 - field: ts
578 type: timestamp, ns
579 index: time"#;
580 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
581 let pipeline = Arc::new(pipeline);
582 let schema = pipeline.schemas().unwrap();
583 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
584
585 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
586 let pipeline_param = crate::GreptimePipelineParams::default();
587 let pipeline_ctx = PipelineContext::new(
588 &pipeline_def,
589 &pipeline_param,
590 session::context::Channel::Unknown,
591 );
592 let payload = VrlValue::Object(BTreeMap::from([(
593 KeyString::from("message"),
594 VrlValue::Bytes(Bytes::from(message)),
595 )]));
596
597 let result = pipeline
598 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
599 .unwrap()
600 .into_transformed()
601 .unwrap();
602
603 assert_eq!(schema_info.schema.len(), result[0].0.values.len());
604 let test = [
605 (
606 ColumnDataType::String as i32,
607 Some(ValueData::StringValue("129.37.245.88".into())),
608 ),
609 (
610 ColumnDataType::String as i32,
611 Some(ValueData::StringValue("meln1ks".into())),
612 ),
613 (
614 ColumnDataType::String as i32,
615 Some(ValueData::StringValue("PATCH".into())),
616 ),
617 (
618 ColumnDataType::String as i32,
619 Some(ValueData::StringValue(
620 "/observability/metrics/production".into(),
621 )),
622 ),
623 (
624 ColumnDataType::String as i32,
625 Some(ValueData::StringValue("HTTP/1.0".into())),
626 ),
627 (
628 ColumnDataType::Uint16 as i32,
629 Some(ValueData::U16Value(501)),
630 ),
631 (
632 ColumnDataType::Uint32 as i32,
633 Some(ValueData::U32Value(33085)),
634 ),
635 (
636 ColumnDataType::TimestampNanosecond as i32,
637 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
638 ),
639 ];
640 let schema = pipeline.schemas().unwrap();
642 for i in 0..schema.len() {
643 let schema = &schema[i];
644 let value = &result[0].0.values[i];
645 assert_eq!(schema.datatype, test[i].0);
646 assert_eq!(value.value_data, test[i].1);
647 }
648 }
649
650 #[test]
651 fn test_csv_pipeline() {
652 let input_value_str = r#"
653 {
654 "my_field": "1,2",
655 "foo": "bar",
656 "ts": "1"
657 }
658 "#;
659 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
660
661 let pipeline_yaml = r#"
662 description: Pipeline for Apache Tomcat
663 processors:
664 - csv:
665 field: my_field
666 target_fields: field1, field2
667 - epoch:
668 field: ts
669 resolution: ns
670 transform:
671 - field: field1
672 type: uint32
673 - field: field2
674 type: uint32
675 - field: ts
676 type: timestamp, ns
677 index: time
678 "#;
679
680 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
681 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
682 let pipeline_ctx = PipelineContext::new(
683 &pipeline_def,
684 &pipeline_param,
685 session::context::Channel::Unknown,
686 );
687
688 let payload = input_value.into();
689 let result = pipeline
690 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
691 .unwrap()
692 .into_transformed()
693 .unwrap();
694 assert_eq!(
695 result[0].0.values[0].value_data,
696 Some(ValueData::U32Value(1))
697 );
698 assert_eq!(
699 result[0].0.values[1].value_data,
700 Some(ValueData::U32Value(2))
701 );
702 match &result[0].0.values[2].value_data {
703 Some(ValueData::TimestampNanosecondValue(v)) => {
704 assert_ne!(v, &0);
705 }
706 _ => panic!("expect null value"),
707 }
708 }
709
710 #[test]
711 fn test_date_pipeline() {
712 let input_value_str = r#"
713 {
714 "my_field": "1,2",
715 "foo": "bar",
716 "test_time": "2014-5-17T04:34:56+00:00"
717 }
718 "#;
719 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
720
721 let pipeline_yaml = r#"---
722description: Pipeline for Apache Tomcat
723
724processors:
725 - date:
726 field: test_time
727
728transform:
729 - field: test_time
730 type: timestamp, ns
731 index: time
732 "#;
733
734 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
735 let pipeline = Arc::new(pipeline);
736 let schema = pipeline.schemas().unwrap();
737 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
738
739 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
740 let pipeline_param = crate::GreptimePipelineParams::default();
741 let pipeline_ctx = PipelineContext::new(
742 &pipeline_def,
743 &pipeline_param,
744 session::context::Channel::Unknown,
745 );
746 let schema = pipeline.schemas().unwrap().clone();
747 let result = input_value.into();
748
749 let rows_with_suffix = pipeline
750 .exec_mut(result, &pipeline_ctx, &mut schema_info)
751 .unwrap()
752 .into_transformed()
753 .unwrap();
754 let output = Rows {
755 schema,
756 rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
757 };
758 let schemas = output.schema;
759
760 assert_eq!(schemas.len(), 1);
761 let schema = schemas[0].clone();
762 assert_eq!("test_time", schema.column_name);
763 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
764 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
765
766 let row = output.rows[0].clone();
767 assert_eq!(1, row.values.len());
768 let value_data = row.values[0].clone().value_data;
769 assert_eq!(
770 Some(v1::value::ValueData::TimestampNanosecondValue(
771 1400301296000000000
772 )),
773 value_data
774 );
775 }
776
777 #[test]
778 fn test_dispatcher() {
779 let pipeline_yaml = r#"
780---
781description: Pipeline for Apache Tomcat
782
783processors:
784 - epoch:
785 field: ts
786 resolution: ns
787
788dispatcher:
789 field: typename
790 rules:
791 - value: http
792 table_suffix: http_events
793 - value: database
794 table_suffix: db_events
795 pipeline: database_pipeline
796
797transform:
798 - field: typename
799 type: string
800 - field: ts
801 type: timestamp, ns
802 index: time
803"#;
804 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
805 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
806 assert_eq!(dispatcher.field, "typename");
807
808 assert_eq!(dispatcher.rules.len(), 2);
809
810 assert_eq!(
811 dispatcher.rules[0],
812 crate::dispatcher::Rule {
813 value: VrlValue::Bytes(Bytes::from("http")),
814 table_suffix: "http_events".to_string(),
815 pipeline: None
816 }
817 );
818
819 assert_eq!(
820 dispatcher.rules[1],
821 crate::dispatcher::Rule {
822 value: VrlValue::Bytes(Bytes::from("database")),
823 table_suffix: "db_events".to_string(),
824 pipeline: Some("database_pipeline".to_string()),
825 }
826 );
827
828 let bad_yaml1 = r#"
829---
830description: Pipeline for Apache Tomcat
831
832processors:
833 - epoch:
834 field: ts
835 resolution: ns
836
837dispatcher:
838 _field: typename
839 rules:
840 - value: http
841 table_suffix: http_events
842 - value: database
843 table_suffix: db_events
844 pipeline: database_pipeline
845
846transform:
847 - field: typename
848 type: string
849 - field: ts
850 type: timestamp, ns
851 index: time
852"#;
853 let bad_yaml2 = r#"
854---
855description: Pipeline for Apache Tomcat
856
857processors:
858 - epoch:
859 field: ts
860 resolution: ns
861dispatcher:
862 field: typename
863 rules:
864 - value: http
865 _table_suffix: http_events
866 - value: database
867 _table_suffix: db_events
868 pipeline: database_pipeline
869
870transform:
871 - field: typename
872 type: string
873 - field: ts
874 type: timestamp, ns
875 index: time
876"#;
877 let bad_yaml3 = r#"
878---
879description: Pipeline for Apache Tomcat
880
881processors:
882 - epoch:
883 field: ts
884 resolution: ns
885dispatcher:
886 field: typename
887 rules:
888 - _value: http
889 table_suffix: http_events
890 - _value: database
891 table_suffix: db_events
892 pipeline: database_pipeline
893
894transform:
895 - field: typename
896 type: string
897 - field: ts
898 type: timestamp, ns
899 index: time
900"#;
901
902 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
903 assert!(r.is_err());
904 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
905 assert!(r.is_err());
906 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
907 assert!(r.is_err());
908 }
909
910 #[test]
913 fn test_one_to_many_vrl_expansion() {
914 let pipeline_yaml = r#"
915processors:
916 - epoch:
917 field: timestamp
918 resolution: ms
919 - vrl:
920 source: |
921 events = del(.events)
922 base_host = del(.host)
923 base_ts = del(.timestamp)
924 map_values(array!(events)) -> |event| {
925 {
926 "host": base_host,
927 "event_type": event.type,
928 "event_value": event.value,
929 "timestamp": base_ts
930 }
931 }
932
933transform:
934 - field: host
935 type: string
936 - field: event_type
937 type: string
938 - field: event_value
939 type: int32
940 - field: timestamp
941 type: timestamp, ms
942 index: time
943"#;
944
945 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
946 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
947 let pipeline_ctx = PipelineContext::new(
948 &pipeline_def,
949 &pipeline_param,
950 session::context::Channel::Unknown,
951 );
952
953 let input_value: serde_json::Value = serde_json::from_str(
955 r#"{
956 "host": "server1",
957 "timestamp": 1716668197217,
958 "events": [
959 {"type": "cpu", "value": 80},
960 {"type": "memory", "value": 60},
961 {"type": "disk", "value": 45}
962 ]
963 }"#,
964 )
965 .unwrap();
966
967 let payload = input_value.into();
968 let result = pipeline
969 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
970 .unwrap()
971 .into_transformed()
972 .unwrap();
973
974 assert_eq!(result.len(), 3);
976
977 for (row, _table_suffix) in &result {
979 assert_eq!(row.values.len(), 4); assert_eq!(
982 row.values[0].value_data,
983 Some(ValueData::StringValue("server1".to_string()))
984 );
985 assert_eq!(
987 row.values[3].value_data,
988 Some(ValueData::TimestampMillisecondValue(1716668197217))
989 );
990 }
991
992 let event_types: Vec<_> = result
994 .iter()
995 .map(|(r, _)| match &r.values[1].value_data {
996 Some(ValueData::StringValue(s)) => s.clone(),
997 _ => panic!("expected string"),
998 })
999 .collect();
1000 assert!(event_types.contains(&"cpu".to_string()));
1001 assert!(event_types.contains(&"memory".to_string()));
1002 assert!(event_types.contains(&"disk".to_string()));
1003 }
1004
1005 #[test]
1007 fn test_single_object_output_unchanged() {
1008 let pipeline_yaml = r#"
1009processors:
1010 - epoch:
1011 field: ts
1012 resolution: ms
1013 - vrl:
1014 source: |
1015 .processed = true
1016 .
1017
1018transform:
1019 - field: name
1020 type: string
1021 - field: processed
1022 type: boolean
1023 - field: ts
1024 type: timestamp, ms
1025 index: time
1026"#;
1027
1028 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1029 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1030 let pipeline_ctx = PipelineContext::new(
1031 &pipeline_def,
1032 &pipeline_param,
1033 session::context::Channel::Unknown,
1034 );
1035
1036 let input_value: serde_json::Value = serde_json::from_str(
1037 r#"{
1038 "name": "test",
1039 "ts": 1716668197217
1040 }"#,
1041 )
1042 .unwrap();
1043
1044 let payload = input_value.into();
1045 let result = pipeline
1046 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1047 .unwrap()
1048 .into_transformed()
1049 .unwrap();
1050
1051 assert_eq!(result.len(), 1);
1053 assert_eq!(
1054 result[0].0.values[0].value_data,
1055 Some(ValueData::StringValue("test".to_string()))
1056 );
1057 assert_eq!(
1058 result[0].0.values[1].value_data,
1059 Some(ValueData::BoolValue(true))
1060 );
1061 }
1062
1063 #[test]
1065 fn test_empty_array_produces_zero_rows() {
1066 let pipeline_yaml = r#"
1067processors:
1068 - vrl:
1069 source: |
1070 .events
1071
1072transform:
1073 - field: value
1074 type: int32
1075 - field: greptime_timestamp
1076 type: timestamp, ns
1077 index: time
1078"#;
1079
1080 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1081 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1082 let pipeline_ctx = PipelineContext::new(
1083 &pipeline_def,
1084 &pipeline_param,
1085 session::context::Channel::Unknown,
1086 );
1087
1088 let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1089
1090 let payload = input_value.into();
1091 let result = pipeline
1092 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1093 .unwrap()
1094 .into_transformed()
1095 .unwrap();
1096
1097 assert_eq!(result.len(), 0);
1099 }
1100
1101 #[test]
1103 fn test_array_element_must_be_object() {
1104 let pipeline_yaml = r#"
1105processors:
1106 - vrl:
1107 source: |
1108 .items
1109
1110transform:
1111 - field: value
1112 type: int32
1113 - field: greptime_timestamp
1114 type: timestamp, ns
1115 index: time
1116"#;
1117
1118 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1119 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1120 let pipeline_ctx = PipelineContext::new(
1121 &pipeline_def,
1122 &pipeline_param,
1123 session::context::Channel::Unknown,
1124 );
1125
1126 let input_value: serde_json::Value =
1128 serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap();
1129
1130 let payload = input_value.into();
1131 let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info);
1132
1133 assert!(result.is_err());
1134 let err_msg = result.unwrap_err().to_string();
1135 assert!(
1136 err_msg.contains("must be an object"),
1137 "Expected error about non-object element, got: {}",
1138 err_msg
1139 );
1140 }
1141
1142 #[test]
1144 fn test_one_to_many_with_table_suffix_hint() {
1145 let pipeline_yaml = r#"
1146processors:
1147 - epoch:
1148 field: ts
1149 resolution: ms
1150 - vrl:
1151 source: |
1152 .greptime_table_suffix = "_" + string!(.category)
1153 .
1154
1155transform:
1156 - field: name
1157 type: string
1158 - field: category
1159 type: string
1160 - field: ts
1161 type: timestamp, ms
1162 index: time
1163"#;
1164
1165 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1166 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1167 let pipeline_ctx = PipelineContext::new(
1168 &pipeline_def,
1169 &pipeline_param,
1170 session::context::Channel::Unknown,
1171 );
1172
1173 let input_value: serde_json::Value = serde_json::from_str(
1174 r#"{
1175 "name": "test",
1176 "category": "metrics",
1177 "ts": 1716668197217
1178 }"#,
1179 )
1180 .unwrap();
1181
1182 let payload = input_value.into();
1183 let result = pipeline
1184 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1185 .unwrap()
1186 .into_transformed()
1187 .unwrap();
1188
1189 assert_eq!(result.len(), 1);
1191 assert_eq!(result[0].1, Some("_metrics".to_string()));
1192 }
1193
1194 #[test]
1196 fn test_one_to_many_per_row_table_suffix() {
1197 let pipeline_yaml = r#"
1198processors:
1199 - epoch:
1200 field: timestamp
1201 resolution: ms
1202 - vrl:
1203 source: |
1204 events = del(.events)
1205 base_ts = del(.timestamp)
1206
1207 map_values(array!(events)) -> |event| {
1208 suffix = "_" + string!(event.category)
1209 {
1210 "name": event.name,
1211 "value": event.value,
1212 "timestamp": base_ts,
1213 "greptime_table_suffix": suffix
1214 }
1215 }
1216
1217transform:
1218 - field: name
1219 type: string
1220 - field: value
1221 type: int32
1222 - field: timestamp
1223 type: timestamp, ms
1224 index: time
1225"#;
1226
1227 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1228 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1229 let pipeline_ctx = PipelineContext::new(
1230 &pipeline_def,
1231 &pipeline_param,
1232 session::context::Channel::Unknown,
1233 );
1234
1235 let input_value: serde_json::Value = serde_json::from_str(
1237 r#"{
1238 "timestamp": 1716668197217,
1239 "events": [
1240 {"name": "cpu_usage", "value": 80, "category": "cpu"},
1241 {"name": "mem_usage", "value": 60, "category": "memory"},
1242 {"name": "cpu_temp", "value": 45, "category": "cpu"}
1243 ]
1244 }"#,
1245 )
1246 .unwrap();
1247
1248 let payload = input_value.into();
1249 let result = pipeline
1250 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1251 .unwrap()
1252 .into_transformed()
1253 .unwrap();
1254
1255 assert_eq!(result.len(), 3);
1257
1258 let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect();
1260
1261 assert!(table_suffixes.contains(&Some("_cpu".to_string())));
1263 assert!(table_suffixes.contains(&Some("_memory".to_string())));
1264
1265 let cpu_count = table_suffixes
1267 .iter()
1268 .filter(|s| *s == &Some("_cpu".to_string()))
1269 .count();
1270 let memory_count = table_suffixes
1271 .iter()
1272 .filter(|s| *s == &Some("_memory".to_string()))
1273 .count();
1274 assert_eq!(cpu_count, 2);
1275 assert_eq!(memory_count, 1);
1276 }
1277
1278 #[test]
1280 fn test_one_to_many_hashmap_contextopt_preservation() {
1281 let pipeline_yaml = r#"
1282processors:
1283 - epoch:
1284 field: timestamp
1285 resolution: ms
1286 - vrl:
1287 source: |
1288 events = del(.events)
1289 base_ts = del(.timestamp)
1290
1291 map_values(array!(events)) -> |event| {
1292 # Set different TTL values per event type
1293 ttl = if event.type == "critical" {
1294 "1h"
1295 } else if event.type == "warning" {
1296 "24h"
1297 } else {
1298 "7d"
1299 }
1300
1301 {
1302 "host": del(.host),
1303 "event_type": event.type,
1304 "event_value": event.value,
1305 "timestamp": base_ts,
1306 "greptime_ttl": ttl
1307 }
1308 }
1309
1310transform:
1311 - field: host
1312 type: string
1313 - field: event_type
1314 type: string
1315 - field: event_value
1316 type: int32
1317 - field: timestamp
1318 type: timestamp, ms
1319 index: time
1320"#;
1321
1322 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1323 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1324 let pipeline_ctx = PipelineContext::new(
1325 &pipeline_def,
1326 &pipeline_param,
1327 session::context::Channel::Unknown,
1328 );
1329
1330 let input_value: serde_json::Value = serde_json::from_str(
1332 r#"{
1333 "host": "server1",
1334 "timestamp": 1716668197217,
1335 "events": [
1336 {"type": "critical", "value": 100},
1337 {"type": "warning", "value": 50},
1338 {"type": "info", "value": 25}
1339 ]
1340 }"#,
1341 )
1342 .unwrap();
1343
1344 let payload = input_value.into();
1345 let result = pipeline
1346 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1347 .unwrap();
1348
1349 let rows_by_context = result.into_transformed_hashmap().unwrap();
1351
1352 assert_eq!(rows_by_context.len(), 3);
1354
1355 let mut context_opts = Vec::new();
1357 for (opt, rows) in &rows_by_context {
1358 assert_eq!(rows.len(), 1); context_opts.push(opt.clone());
1360 }
1361
1362 assert_ne!(context_opts[0], context_opts[1]);
1364 assert_ne!(context_opts[1], context_opts[2]);
1365 assert_ne!(context_opts[0], context_opts[2]);
1366
1367 for rows in rows_by_context.values() {
1369 for (row, _table_suffix) in rows {
1370 assert_eq!(row.values.len(), 4); }
1372 }
1373 }
1374
1375 #[test]
1377 fn test_single_object_hashmap_compatibility() {
1378 let pipeline_yaml = r#"
1379processors:
1380 - epoch:
1381 field: ts
1382 resolution: ms
1383 - vrl:
1384 source: |
1385 .processed = true
1386 .
1387
1388transform:
1389 - field: name
1390 type: string
1391 - field: processed
1392 type: boolean
1393 - field: ts
1394 type: timestamp, ms
1395 index: time
1396"#;
1397
1398 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1399 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1400 let pipeline_ctx = PipelineContext::new(
1401 &pipeline_def,
1402 &pipeline_param,
1403 session::context::Channel::Unknown,
1404 );
1405
1406 let input_value: serde_json::Value = serde_json::from_str(
1407 r#"{
1408 "name": "test",
1409 "ts": 1716668197217
1410 }"#,
1411 )
1412 .unwrap();
1413
1414 let payload = input_value.into();
1415 let result = pipeline
1416 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1417 .unwrap();
1418
1419 let rows_by_context = result.into_transformed_hashmap().unwrap();
1421
1422 assert_eq!(rows_by_context.len(), 1);
1424
1425 let (_opt, rows) = rows_by_context.into_iter().next().unwrap();
1426 assert_eq!(rows.len(), 1);
1427
1428 let (row, _table_suffix) = &rows[0];
1430 assert_eq!(row.values.len(), 3); }
1432
1433 #[test]
1435 fn test_empty_array_hashmap() {
1436 let pipeline_yaml = r#"
1437processors:
1438 - vrl:
1439 source: |
1440 .events
1441
1442transform:
1443 - field: value
1444 type: int32
1445 - field: greptime_timestamp
1446 type: timestamp, ns
1447 index: time
1448"#;
1449
1450 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1451 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1452 let pipeline_ctx = PipelineContext::new(
1453 &pipeline_def,
1454 &pipeline_param,
1455 session::context::Channel::Unknown,
1456 );
1457
1458 let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1459
1460 let payload = input_value.into();
1461 let result = pipeline
1462 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1463 .unwrap();
1464
1465 let rows_by_context = result.into_transformed_hashmap().unwrap();
1467
1468 assert_eq!(rows_by_context.len(), 0);
1470 }
1471
1472 #[test]
1473 fn test_pipeline_detailed_index_options_roundtrip() {
1474 let pipeline_yaml = r#"
1475transform:
1476 - field: message
1477 type: string
1478 index:
1479 type: fulltext
1480 options:
1481 analyzer: Chinese
1482 case_sensitive: true
1483 backend: tantivy
1484 - field: trace_id
1485 type: int64
1486 index:
1487 type: skipping
1488 options:
1489 granularity: 2048
1490 false_positive_rate: 0.02
1491 type: BLOOM
1492 - field: ts
1493 type: timestamp, ns
1494 index: time
1495"#;
1496
1497 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1498 let schema = pipeline.schemas().unwrap().clone();
1499
1500 let message = schema
1501 .iter()
1502 .find(|column| column.column_name == "message")
1503 .unwrap();
1504 let trace_id = schema
1505 .iter()
1506 .find(|column| column.column_name == "trace_id")
1507 .unwrap();
1508 let message_options = message.options.clone();
1509 let trace_id_options = trace_id.options.clone();
1510
1511 let fulltext: FulltextOptions = serde_json::from_str(
1512 message
1513 .options
1514 .as_ref()
1515 .unwrap()
1516 .options
1517 .get("fulltext")
1518 .unwrap(),
1519 )
1520 .unwrap();
1521 assert!(fulltext.enable);
1522 assert_eq!(fulltext.analyzer.to_string(), "Chinese");
1523 assert!(fulltext.case_sensitive);
1524 assert_eq!(fulltext.backend.to_string(), "tantivy");
1525
1526 let skipping: SkippingIndexOptions = serde_json::from_str(
1527 trace_id
1528 .options
1529 .as_ref()
1530 .unwrap()
1531 .options
1532 .get("skipping_index")
1533 .unwrap(),
1534 )
1535 .unwrap();
1536 assert_eq!(skipping.granularity, 2048);
1537 assert_eq!(skipping.false_positive_rate(), 0.02);
1538 assert_eq!(skipping.index_type.to_string(), "BLOOM");
1539
1540 let roundtrip_schema = SchemaInfo::from_schema_list(schema)
1541 .column_schemas()
1542 .unwrap();
1543 let roundtrip_message = roundtrip_schema
1544 .iter()
1545 .find(|column| column.column_name == "message")
1546 .unwrap();
1547 let roundtrip_trace_id = roundtrip_schema
1548 .iter()
1549 .find(|column| column.column_name == "trace_id")
1550 .unwrap();
1551
1552 assert_eq!(message_options, roundtrip_message.options);
1553 assert_eq!(trace_id_options, roundtrip_trace_id.options);
1554 }
1555}