pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::BTreeMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{ensure, OptionExt, ResultExt};
29use transform::Transforms;
30use value::Value;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35    AutoTransformOneTimestampSnafu, Error, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu,
36    InvalidVersionNumberSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
37};
38use crate::etl::processor::ProcessorKind;
39use crate::etl::transform::transformer::greptime::values_to_row;
40use crate::tablesuffix::TableSuffixTemplate;
41use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
42
43const DESCRIPTION: &str = "description";
44const DOC_VERSION: &str = "version";
45const PROCESSORS: &str = "processors";
46const TRANSFORM: &str = "transform";
47const TRANSFORMS: &str = "transforms";
48const DISPATCHER: &str = "dispatcher";
49const TABLESUFFIX: &str = "table_suffix";
50
51pub enum Content<'a> {
52    Json(&'a str),
53    Yaml(&'a str),
54}
55
56pub fn parse(input: &Content) -> Result<Pipeline> {
57    match input {
58        Content::Yaml(str) => {
59            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
60
61            ensure!(docs.len() == 1, YamlParseSnafu);
62
63            let doc = &docs[0];
64
65            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
66
67            let doc_version = (&doc[DOC_VERSION]).try_into()?;
68
69            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
70                v.try_into()?
71            } else {
72                Processors::default()
73            };
74
75            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
76            {
77                v.try_into()?
78            } else {
79                Transforms::default()
80            };
81
82            let transformer = if transformers.is_empty() {
83                // use auto transform
84                // check processors have at least one timestamp-related processor
85                let cnt = processors
86                    .iter()
87                    .filter_map(|p| match p {
88                        ProcessorKind::Date(d) if !d.ignore_missing() => Some(
89                            d.fields
90                                .iter()
91                                .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
92                                .collect_vec(),
93                        ),
94                        ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
95                            e.fields
96                                .iter()
97                                .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
98                                .collect_vec(),
99                        ),
100                        _ => None,
101                    })
102                    .flatten()
103                    .collect_vec();
104                ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
105
106                let (ts_name, timeunit) = cnt.first().unwrap();
107                TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
108            } else {
109                TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
110                    transformers,
111                    &doc_version,
112                )?)
113            };
114
115            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
116                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
117            } else {
118                None
119            };
120
121            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
122                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
123            } else {
124                None
125            };
126
127            Ok(Pipeline {
128                doc_version,
129                description,
130                processors,
131                transformer,
132                dispatcher,
133                tablesuffix,
134            })
135        }
136        Content::Json(_) => unimplemented!(),
137    }
138}
139
140#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
141pub enum PipelineDocVersion {
142    /// 1. All fields meant to be preserved have to explicitly set in the transform section.
143    /// 2. Or no transform is set, then the auto-transform will be used.
144    #[default]
145    V1,
146
147    /// A combination of transform and auto-transform.
148    /// First it goes through the transform section,
149    /// then use auto-transform to set the rest fields.
150    ///
151    /// This is useful if you only want to set the index field,
152    /// and let the normal fields be auto-inferred.
153    V2,
154}
155
156impl TryFrom<&Yaml> for PipelineDocVersion {
157    type Error = Error;
158
159    fn try_from(value: &Yaml) -> Result<Self> {
160        if value.is_badvalue() || value.is_null() {
161            return Ok(PipelineDocVersion::V1);
162        }
163
164        let version = match value {
165            Yaml::String(s) => s
166                .parse::<i64>()
167                .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
168            Yaml::Integer(i) => *i,
169            _ => {
170                return InvalidVersionNumberSnafu {
171                    version: value.as_str().unwrap_or_default().to_string(),
172                }
173                .fail();
174            }
175        };
176
177        match version {
178            1 => Ok(PipelineDocVersion::V1),
179            2 => Ok(PipelineDocVersion::V2),
180            _ => InvalidVersionNumberSnafu {
181                version: version.to_string(),
182            }
183            .fail(),
184        }
185    }
186}
187
188#[derive(Debug)]
189pub struct Pipeline {
190    doc_version: PipelineDocVersion,
191    description: Option<String>,
192    processors: processor::Processors,
193    dispatcher: Option<Dispatcher>,
194    transformer: TransformerMode,
195    tablesuffix: Option<TableSuffixTemplate>,
196}
197
198#[derive(Debug, Clone)]
199pub enum TransformerMode {
200    GreptimeTransformer(GreptimeTransformer),
201    AutoTransform(String, TimeUnit),
202}
203
204/// Where the pipeline executed is dispatched to, with context information
205#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
206pub struct DispatchedTo {
207    pub table_suffix: String,
208    pub pipeline: Option<String>,
209}
210
211impl From<&Rule> for DispatchedTo {
212    fn from(value: &Rule) -> Self {
213        DispatchedTo {
214            table_suffix: value.table_suffix.clone(),
215            pipeline: value.pipeline.clone(),
216        }
217    }
218}
219
220impl DispatchedTo {
221    /// Generate destination table name from input
222    pub fn dispatched_to_table_name(&self, original: &str) -> String {
223        format!("{}_{}", &original, self.table_suffix)
224    }
225}
226
227/// The result of pipeline execution
228#[derive(Debug)]
229pub enum PipelineExecOutput {
230    Transformed(TransformedOutput),
231    DispatchedTo(DispatchedTo, Value),
232}
233
234#[derive(Debug)]
235pub struct TransformedOutput {
236    pub opt: ContextOpt,
237    pub row: Row,
238    pub table_suffix: Option<String>,
239}
240
241impl PipelineExecOutput {
242    // Note: This is a test only function, do not use it in production.
243    pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
244        if let Self::Transformed(TransformedOutput {
245            row, table_suffix, ..
246        }) = self
247        {
248            Some((row, table_suffix))
249        } else {
250            None
251        }
252    }
253
254    // Note: This is a test only function, do not use it in production.
255    pub fn into_dispatched(self) -> Option<DispatchedTo> {
256        if let Self::DispatchedTo(d, _) = self {
257            Some(d)
258        } else {
259            None
260        }
261    }
262}
263
264pub fn json_to_map(val: serde_json::Value) -> Result<Value> {
265    match val {
266        serde_json::Value::Object(map) => {
267            let mut intermediate_state = BTreeMap::new();
268            for (k, v) in map {
269                intermediate_state.insert(k, Value::try_from(v)?);
270            }
271            Ok(Value::Map(intermediate_state.into()))
272        }
273        _ => InputValueMustBeObjectSnafu.fail(),
274    }
275}
276
277pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<Value>> {
278    val.into_iter().map(json_to_map).collect()
279}
280
281pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<Value> {
282    match val {
283        simd_json::OwnedValue::Object(map) => {
284            let mut intermediate_state = BTreeMap::new();
285            for (k, v) in map.into_iter() {
286                intermediate_state.insert(k, Value::try_from(v)?);
287            }
288            Ok(Value::Map(intermediate_state.into()))
289        }
290        _ => InputValueMustBeObjectSnafu.fail(),
291    }
292}
293
294pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Value>> {
295    val.into_iter().map(simd_json_to_map).collect()
296}
297
298impl Pipeline {
299    fn is_v1(&self) -> bool {
300        self.doc_version == PipelineDocVersion::V1
301    }
302
303    pub fn exec_mut(
304        &self,
305        mut val: Value,
306        pipeline_ctx: &PipelineContext<'_>,
307        schema_info: &mut SchemaInfo,
308    ) -> Result<PipelineExecOutput> {
309        // process
310        for processor in self.processors.iter() {
311            val = processor.exec_mut(val)?;
312        }
313
314        // dispatch, fast return if matched
315        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
316            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
317        }
318
319        // extract the options first
320        // this might be a breaking change, for table_suffix is now right after the processors
321        let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
322        let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
323
324        let row = match self.transformer() {
325            TransformerMode::GreptimeTransformer(greptime_transformer) => {
326                let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
327                if self.is_v1() {
328                    // v1 dont combine with auto-transform
329                    // so return immediately
330                    return Ok(PipelineExecOutput::Transformed(TransformedOutput {
331                        opt,
332                        row: Row { values },
333                        table_suffix,
334                    }));
335                }
336                // continue v2 process, check ts column and set the rest fields with auto-transform
337                // if transformer presents, then ts has been set
338                values_to_row(schema_info, val, pipeline_ctx, Some(values))?
339            }
340            TransformerMode::AutoTransform(ts_name, time_unit) => {
341                // infer ts from the context
342                // we've check that only one timestamp should exist
343
344                // Create pipeline context with the found timestamp
345                let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
346                    IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
347                ));
348                let n_ctx =
349                    PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
350                values_to_row(schema_info, val, &n_ctx, None)?
351            }
352        };
353
354        Ok(PipelineExecOutput::Transformed(TransformedOutput {
355            opt,
356            row,
357            table_suffix,
358        }))
359    }
360
361    pub fn processors(&self) -> &processor::Processors {
362        &self.processors
363    }
364
365    pub fn transformer(&self) -> &TransformerMode {
366        &self.transformer
367    }
368
369    // the method is for test purpose
370    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
371        match &self.transformer {
372            TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
373            TransformerMode::AutoTransform(_, _) => None,
374        }
375    }
376}
377
378pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
379    intermediate_keys
380        .iter()
381        .position(|k| k == key)
382        .context(IntermediateKeyIndexSnafu { kind, key })
383}
384
385/// This macro is test only, do not use it in production.
386/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
387///
388/// Usage:
389/// ```rust
390/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
391/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
392/// ```
393#[macro_export]
394macro_rules! setup_pipeline {
395    ($pipeline:expr) => {{
396        use std::sync::Arc;
397
398        use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
399
400        let pipeline: Arc<Pipeline> = Arc::new($pipeline);
401        let schema = pipeline.schemas().unwrap();
402        let schema_info = SchemaInfo::from_schema_list(schema.clone());
403
404        let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
405        let pipeline_param = GreptimePipelineParams::default();
406
407        (pipeline, schema_info, pipeline_def, pipeline_param)
408    }};
409}
410#[cfg(test)]
411mod tests {
412    use std::sync::Arc;
413
414    use api::v1::Rows;
415    use greptime_proto::v1::value::ValueData;
416    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
417
418    use super::*;
419
420    #[test]
421    fn test_pipeline_prepare() {
422        let input_value_str = r#"
423                    {
424                        "my_field": "1,2",
425                        "foo": "bar",
426                        "ts": "1"
427                    }
428                "#;
429        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
430
431        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
432processors:
433    - csv:
434        field: my_field
435        target_fields: field1, field2
436    - epoch:
437        field: ts
438        resolution: ns
439transform:
440    - field: field1
441      type: uint32
442    - field: field2
443      type: uint32
444    - field: ts
445      type: timestamp, ns
446      index: time
447    "#;
448
449        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
450        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
451        let pipeline_ctx = PipelineContext::new(
452            &pipeline_def,
453            &pipeline_param,
454            session::context::Channel::Unknown,
455        );
456
457        let payload = json_to_map(input_value).unwrap();
458        let result = pipeline
459            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
460            .unwrap()
461            .into_transformed()
462            .unwrap();
463
464        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
465        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
466        match &result.0.values[2].value_data {
467            Some(ValueData::TimestampNanosecondValue(v)) => {
468                assert_ne!(v, &0);
469            }
470            _ => panic!("expect null value"),
471        }
472    }
473
474    #[test]
475    fn test_dissect_pipeline() {
476        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
477        let pipeline_str = r#"processors:
478    - dissect:
479        fields:
480          - message
481        patterns:
482          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
483    - date:
484        fields:
485          - ts
486        formats:
487          - "%d/%b/%Y:%H:%M:%S %z"
488
489transform:
490    - fields:
491        - ip
492        - username
493        - method
494        - path
495        - proto
496      type: string
497    - fields:
498        - status
499      type: uint16
500    - fields:
501        - bytes
502      type: uint32
503    - field: ts
504      type: timestamp, ns
505      index: time"#;
506        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
507        let pipeline = Arc::new(pipeline);
508        let schema = pipeline.schemas().unwrap();
509        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
510
511        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
512        let pipeline_param = crate::GreptimePipelineParams::default();
513        let pipeline_ctx = PipelineContext::new(
514            &pipeline_def,
515            &pipeline_param,
516            session::context::Channel::Unknown,
517        );
518        let mut payload = BTreeMap::new();
519        payload.insert("message".to_string(), Value::String(message));
520        let payload = Value::Map(payload.into());
521
522        let result = pipeline
523            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
524            .unwrap()
525            .into_transformed()
526            .unwrap();
527
528        // println!("[DEBUG]schema_info: {:?}", schema_info.schema);
529        // println!("[DEBUG]re: {:?}", result.0.values);
530
531        assert_eq!(schema_info.schema.len(), result.0.values.len());
532        let test = vec![
533            (
534                ColumnDataType::String as i32,
535                Some(ValueData::StringValue("129.37.245.88".into())),
536            ),
537            (
538                ColumnDataType::String as i32,
539                Some(ValueData::StringValue("meln1ks".into())),
540            ),
541            (
542                ColumnDataType::String as i32,
543                Some(ValueData::StringValue("PATCH".into())),
544            ),
545            (
546                ColumnDataType::String as i32,
547                Some(ValueData::StringValue(
548                    "/observability/metrics/production".into(),
549                )),
550            ),
551            (
552                ColumnDataType::String as i32,
553                Some(ValueData::StringValue("HTTP/1.0".into())),
554            ),
555            (
556                ColumnDataType::Uint16 as i32,
557                Some(ValueData::U16Value(501)),
558            ),
559            (
560                ColumnDataType::Uint32 as i32,
561                Some(ValueData::U32Value(33085)),
562            ),
563            (
564                ColumnDataType::TimestampNanosecond as i32,
565                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
566            ),
567        ];
568        // manually set schema
569        let schema = pipeline.schemas().unwrap();
570        for i in 0..schema.len() {
571            let schema = &schema[i];
572            let value = &result.0.values[i];
573            assert_eq!(schema.datatype, test[i].0);
574            assert_eq!(value.value_data, test[i].1);
575        }
576    }
577
578    #[test]
579    fn test_csv_pipeline() {
580        let input_value_str = r#"
581                    {
582                        "my_field": "1,2",
583                        "foo": "bar",
584                        "ts": "1"
585                    }
586                "#;
587        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
588
589        let pipeline_yaml = r#"
590    description: Pipeline for Apache Tomcat
591    processors:
592      - csv:
593          field: my_field
594          target_fields: field1, field2
595      - epoch:
596          field: ts
597          resolution: ns
598    transform:
599      - field: field1
600        type: uint32
601      - field: field2
602        type: uint32
603      - field: ts
604        type: timestamp, ns
605        index: time
606    "#;
607
608        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
609        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
610        let pipeline_ctx = PipelineContext::new(
611            &pipeline_def,
612            &pipeline_param,
613            session::context::Channel::Unknown,
614        );
615
616        let payload = json_to_map(input_value).unwrap();
617        let result = pipeline
618            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
619            .unwrap()
620            .into_transformed()
621            .unwrap();
622        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
623        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
624        match &result.0.values[2].value_data {
625            Some(ValueData::TimestampNanosecondValue(v)) => {
626                assert_ne!(v, &0);
627            }
628            _ => panic!("expect null value"),
629        }
630    }
631
632    #[test]
633    fn test_date_pipeline() {
634        let input_value_str = r#"
635                {
636                    "my_field": "1,2",
637                    "foo": "bar",
638                    "test_time": "2014-5-17T04:34:56+00:00"
639                }
640            "#;
641        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
642
643        let pipeline_yaml = r#"---
644description: Pipeline for Apache Tomcat
645
646processors:
647    - date:
648        field: test_time
649
650transform:
651    - field: test_time
652      type: timestamp, ns
653      index: time
654    "#;
655
656        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
657        let pipeline = Arc::new(pipeline);
658        let schema = pipeline.schemas().unwrap();
659        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
660
661        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
662        let pipeline_param = crate::GreptimePipelineParams::default();
663        let pipeline_ctx = PipelineContext::new(
664            &pipeline_def,
665            &pipeline_param,
666            session::context::Channel::Unknown,
667        );
668        let schema = pipeline.schemas().unwrap().clone();
669        let result = json_to_map(input_value).unwrap();
670
671        let row = pipeline
672            .exec_mut(result, &pipeline_ctx, &mut schema_info)
673            .unwrap()
674            .into_transformed()
675            .unwrap();
676        let output = Rows {
677            schema,
678            rows: vec![row.0],
679        };
680        let schemas = output.schema;
681
682        assert_eq!(schemas.len(), 1);
683        let schema = schemas[0].clone();
684        assert_eq!("test_time", schema.column_name);
685        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
686        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
687
688        let row = output.rows[0].clone();
689        assert_eq!(1, row.values.len());
690        let value_data = row.values[0].clone().value_data;
691        assert_eq!(
692            Some(v1::value::ValueData::TimestampNanosecondValue(
693                1400301296000000000
694            )),
695            value_data
696        );
697    }
698
699    #[test]
700    fn test_dispatcher() {
701        let pipeline_yaml = r#"
702---
703description: Pipeline for Apache Tomcat
704
705processors:
706  - epoch:
707      field: ts
708      resolution: ns
709
710dispatcher:
711  field: typename
712  rules:
713    - value: http
714      table_suffix: http_events
715    - value: database
716      table_suffix: db_events
717      pipeline: database_pipeline
718
719transform:
720  - field: typename
721    type: string
722  - field: ts
723    type: timestamp, ns
724    index: time
725"#;
726        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
727        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
728        assert_eq!(dispatcher.field, "typename");
729
730        assert_eq!(dispatcher.rules.len(), 2);
731
732        assert_eq!(
733            dispatcher.rules[0],
734            crate::dispatcher::Rule {
735                value: Value::String("http".to_string()),
736                table_suffix: "http_events".to_string(),
737                pipeline: None
738            }
739        );
740
741        assert_eq!(
742            dispatcher.rules[1],
743            crate::dispatcher::Rule {
744                value: Value::String("database".to_string()),
745                table_suffix: "db_events".to_string(),
746                pipeline: Some("database_pipeline".to_string()),
747            }
748        );
749
750        let bad_yaml1 = r#"
751---
752description: Pipeline for Apache Tomcat
753
754processors:
755  - epoch:
756      field: ts
757      resolution: ns
758
759dispatcher:
760  _field: typename
761  rules:
762    - value: http
763      table_suffix: http_events
764    - value: database
765      table_suffix: db_events
766      pipeline: database_pipeline
767
768transform:
769  - field: typename
770    type: string
771  - field: ts
772    type: timestamp, ns
773    index: time
774"#;
775        let bad_yaml2 = r#"
776---
777description: Pipeline for Apache Tomcat
778
779processors:
780  - epoch:
781      field: ts
782      resolution: ns
783dispatcher:
784  field: typename
785  rules:
786    - value: http
787      _table_suffix: http_events
788    - value: database
789      _table_suffix: db_events
790      pipeline: database_pipeline
791
792transform:
793  - field: typename
794    type: string
795  - field: ts
796    type: timestamp, ns
797    index: time
798"#;
799        let bad_yaml3 = r#"
800---
801description: Pipeline for Apache Tomcat
802
803processors:
804  - epoch:
805      field: ts
806      resolution: ns
807dispatcher:
808  field: typename
809  rules:
810    - _value: http
811      table_suffix: http_events
812    - _value: database
813      table_suffix: db_events
814      pipeline: database_pipeline
815
816transform:
817  - field: typename
818    type: string
819  - field: ts
820    type: timestamp, ns
821    index: time
822"#;
823
824        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
825        assert!(r.is_err());
826        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
827        assert!(r.is_err());
828        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
829        assert!(r.is_err());
830    }
831}