pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use itertools::Itertools;
25use processor::{Processor, Processors};
26use snafu::{OptionExt, ResultExt, ensure};
27use transform::Transforms;
28use vrl::core::Value as VrlValue;
29use yaml_rust::{Yaml, YamlLoader};
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33    AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu,
34    Result, YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::processor::ProcessorKind;
37use crate::etl::transform::transformer::greptime::values_to_row;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
40
41const DESCRIPTION: &str = "description";
42const DOC_VERSION: &str = "version";
43const PROCESSORS: &str = "processors";
44const TRANSFORM: &str = "transform";
45const TRANSFORMS: &str = "transforms";
46const DISPATCHER: &str = "dispatcher";
47const TABLESUFFIX: &str = "table_suffix";
48
49pub enum Content<'a> {
50    Json(&'a str),
51    Yaml(&'a str),
52}
53
54pub fn parse(input: &Content) -> Result<Pipeline> {
55    match input {
56        Content::Yaml(str) => {
57            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
58
59            ensure!(docs.len() == 1, YamlParseSnafu);
60
61            let doc = &docs[0];
62
63            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
64
65            let doc_version = (&doc[DOC_VERSION]).try_into()?;
66
67            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
68                v.try_into()?
69            } else {
70                Processors::default()
71            };
72
73            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
74            {
75                v.try_into()?
76            } else {
77                Transforms::default()
78            };
79
80            let transformer = if transformers.is_empty() {
81                // use auto transform
82                // check processors have at least one timestamp-related processor
83                let cnt = processors
84                    .iter()
85                    .filter_map(|p| match p {
86                        ProcessorKind::Date(d) if !d.ignore_missing() => Some(
87                            d.fields
88                                .iter()
89                                .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
90                                .collect_vec(),
91                        ),
92                        ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
93                            e.fields
94                                .iter()
95                                .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
96                                .collect_vec(),
97                        ),
98                        _ => None,
99                    })
100                    .flatten()
101                    .collect_vec();
102                ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
103
104                let (ts_name, timeunit) = cnt.first().unwrap();
105                TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
106            } else {
107                TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
108                    transformers,
109                    &doc_version,
110                )?)
111            };
112
113            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
114                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
115            } else {
116                None
117            };
118
119            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
120                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
121            } else {
122                None
123            };
124
125            Ok(Pipeline {
126                doc_version,
127                description,
128                processors,
129                transformer,
130                dispatcher,
131                tablesuffix,
132            })
133        }
134        Content::Json(_) => unimplemented!(),
135    }
136}
137
138#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
139pub enum PipelineDocVersion {
140    /// 1. All fields meant to be preserved have to explicitly set in the transform section.
141    /// 2. Or no transform is set, then the auto-transform will be used.
142    #[default]
143    V1,
144
145    /// A combination of transform and auto-transform.
146    /// First it goes through the transform section,
147    /// then use auto-transform to set the rest fields.
148    ///
149    /// This is useful if you only want to set the index field,
150    /// and let the normal fields be auto-inferred.
151    V2,
152}
153
154impl TryFrom<&Yaml> for PipelineDocVersion {
155    type Error = Error;
156
157    fn try_from(value: &Yaml) -> Result<Self> {
158        if value.is_badvalue() || value.is_null() {
159            return Ok(PipelineDocVersion::V1);
160        }
161
162        let version = match value {
163            Yaml::String(s) => s
164                .parse::<i64>()
165                .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
166            Yaml::Integer(i) => *i,
167            _ => {
168                return InvalidVersionNumberSnafu {
169                    version: value.as_str().unwrap_or_default().to_string(),
170                }
171                .fail();
172            }
173        };
174
175        match version {
176            1 => Ok(PipelineDocVersion::V1),
177            2 => Ok(PipelineDocVersion::V2),
178            _ => InvalidVersionNumberSnafu {
179                version: version.to_string(),
180            }
181            .fail(),
182        }
183    }
184}
185
186#[derive(Debug)]
187pub struct Pipeline {
188    doc_version: PipelineDocVersion,
189    description: Option<String>,
190    processors: processor::Processors,
191    dispatcher: Option<Dispatcher>,
192    transformer: TransformerMode,
193    tablesuffix: Option<TableSuffixTemplate>,
194}
195
196#[derive(Debug, Clone)]
197pub enum TransformerMode {
198    GreptimeTransformer(GreptimeTransformer),
199    AutoTransform(String, TimeUnit),
200}
201
202/// Where the pipeline executed is dispatched to, with context information
203#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
204pub struct DispatchedTo {
205    pub table_suffix: String,
206    pub pipeline: Option<String>,
207}
208
209impl From<&Rule> for DispatchedTo {
210    fn from(value: &Rule) -> Self {
211        DispatchedTo {
212            table_suffix: value.table_suffix.clone(),
213            pipeline: value.pipeline.clone(),
214        }
215    }
216}
217
218impl DispatchedTo {
219    /// Generate destination table name from input
220    pub fn dispatched_to_table_name(&self, original: &str) -> String {
221        [original, &self.table_suffix].concat()
222    }
223}
224
225/// The result of pipeline execution
226#[derive(Debug)]
227pub enum PipelineExecOutput {
228    Transformed(TransformedOutput),
229    DispatchedTo(DispatchedTo, VrlValue),
230    Filtered,
231}
232
233#[derive(Debug)]
234pub struct TransformedOutput {
235    pub opt: ContextOpt,
236    pub row: Row,
237    pub table_suffix: Option<String>,
238}
239
240impl PipelineExecOutput {
241    // Note: This is a test only function, do not use it in production.
242    pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
243        if let Self::Transformed(TransformedOutput {
244            row, table_suffix, ..
245        }) = self
246        {
247            Some((row, table_suffix))
248        } else {
249            None
250        }
251    }
252
253    // Note: This is a test only function, do not use it in production.
254    pub fn into_dispatched(self) -> Option<DispatchedTo> {
255        if let Self::DispatchedTo(d, _) = self {
256            Some(d)
257        } else {
258            None
259        }
260    }
261}
262
263impl Pipeline {
264    fn is_v1(&self) -> bool {
265        self.doc_version == PipelineDocVersion::V1
266    }
267
268    pub fn exec_mut(
269        &self,
270        mut val: VrlValue,
271        pipeline_ctx: &PipelineContext<'_>,
272        schema_info: &mut SchemaInfo,
273    ) -> Result<PipelineExecOutput> {
274        // process
275        for processor in self.processors.iter() {
276            val = processor.exec_mut(val)?;
277            if val.is_null() {
278                // line is filtered
279                return Ok(PipelineExecOutput::Filtered);
280            }
281        }
282
283        // dispatch, fast return if matched
284        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
285            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
286        }
287
288        // extract the options first
289        // this might be a breaking change, for table_suffix is now right after the processors
290        let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
291        let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
292
293        let row = match self.transformer() {
294            TransformerMode::GreptimeTransformer(greptime_transformer) => {
295                let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
296                if self.is_v1() {
297                    // v1 dont combine with auto-transform
298                    // so return immediately
299                    return Ok(PipelineExecOutput::Transformed(TransformedOutput {
300                        opt,
301                        row: Row { values },
302                        table_suffix,
303                    }));
304                }
305                // continue v2 process, and set the rest fields with auto-transform
306                // if transformer presents, then ts has been set
307                values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
308            }
309            TransformerMode::AutoTransform(ts_name, time_unit) => {
310                // infer ts from the context
311                // we've check that only one timestamp should exist
312
313                // Create pipeline context with the found timestamp
314                let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
315                    IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
316                ));
317                let n_ctx =
318                    PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
319                values_to_row(schema_info, val, &n_ctx, None, true)?
320            }
321        };
322
323        Ok(PipelineExecOutput::Transformed(TransformedOutput {
324            opt,
325            row,
326            table_suffix,
327        }))
328    }
329
330    pub fn processors(&self) -> &processor::Processors {
331        &self.processors
332    }
333
334    pub fn transformer(&self) -> &TransformerMode {
335        &self.transformer
336    }
337
338    // the method is for test purpose
339    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
340        match &self.transformer {
341            TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
342            TransformerMode::AutoTransform(_, _) => None,
343        }
344    }
345
346    pub fn is_variant_table_name(&self) -> bool {
347        // even if the pipeline doesn't have dispatcher or table_suffix,
348        // it can still be a variant because of VRL processor and hint
349        self.dispatcher.is_some() || self.tablesuffix.is_some()
350    }
351}
352
353pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
354    intermediate_keys
355        .iter()
356        .position(|k| k == key)
357        .context(IntermediateKeyIndexSnafu { kind, key })
358}
359
360/// This macro is test only, do not use it in production.
361/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
362///
363/// Usage:
364/// ```rust
365/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
366/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
367/// ```
368#[macro_export]
369macro_rules! setup_pipeline {
370    ($pipeline:expr) => {{
371        use std::sync::Arc;
372
373        use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
374
375        let pipeline: Arc<Pipeline> = Arc::new($pipeline);
376        let schema = pipeline.schemas().unwrap();
377        let schema_info = SchemaInfo::from_schema_list(schema.clone());
378
379        let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
380        let pipeline_param = GreptimePipelineParams::default();
381
382        (pipeline, schema_info, pipeline_def, pipeline_param)
383    }};
384}
385#[cfg(test)]
386mod tests {
387    use std::collections::BTreeMap;
388    use std::sync::Arc;
389
390    use api::v1::Rows;
391    use greptime_proto::v1::value::ValueData;
392    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
393    use vrl::prelude::Bytes;
394    use vrl::value::KeyString;
395
396    use super::*;
397
398    #[test]
399    fn test_pipeline_prepare() {
400        let input_value_str = r#"
401                    {
402                        "my_field": "1,2",
403                        "foo": "bar",
404                        "ts": "1"
405                    }
406                "#;
407        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
408
409        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
410processors:
411    - csv:
412        field: my_field
413        target_fields: field1, field2
414    - epoch:
415        field: ts
416        resolution: ns
417transform:
418    - field: field1
419      type: uint32
420    - field: field2
421      type: uint32
422    - field: ts
423      type: timestamp, ns
424      index: time
425    "#;
426
427        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
428        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
429        let pipeline_ctx = PipelineContext::new(
430            &pipeline_def,
431            &pipeline_param,
432            session::context::Channel::Unknown,
433        );
434
435        let payload = input_value.into();
436        let result = pipeline
437            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
438            .unwrap()
439            .into_transformed()
440            .unwrap();
441
442        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
443        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
444        match &result.0.values[2].value_data {
445            Some(ValueData::TimestampNanosecondValue(v)) => {
446                assert_ne!(v, &0);
447            }
448            _ => panic!("expect null value"),
449        }
450    }
451
452    #[test]
453    fn test_dissect_pipeline() {
454        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
455        let pipeline_str = r#"processors:
456    - dissect:
457        fields:
458          - message
459        patterns:
460          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
461    - date:
462        fields:
463          - ts
464        formats:
465          - "%d/%b/%Y:%H:%M:%S %z"
466
467transform:
468    - fields:
469        - ip
470        - username
471        - method
472        - path
473        - proto
474      type: string
475    - fields:
476        - status
477      type: uint16
478    - fields:
479        - bytes
480      type: uint32
481    - field: ts
482      type: timestamp, ns
483      index: time"#;
484        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
485        let pipeline = Arc::new(pipeline);
486        let schema = pipeline.schemas().unwrap();
487        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
488
489        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
490        let pipeline_param = crate::GreptimePipelineParams::default();
491        let pipeline_ctx = PipelineContext::new(
492            &pipeline_def,
493            &pipeline_param,
494            session::context::Channel::Unknown,
495        );
496        let payload = VrlValue::Object(BTreeMap::from([(
497            KeyString::from("message"),
498            VrlValue::Bytes(Bytes::from(message)),
499        )]));
500
501        let result = pipeline
502            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
503            .unwrap()
504            .into_transformed()
505            .unwrap();
506
507        assert_eq!(schema_info.schema.len(), result.0.values.len());
508        let test = vec![
509            (
510                ColumnDataType::String as i32,
511                Some(ValueData::StringValue("129.37.245.88".into())),
512            ),
513            (
514                ColumnDataType::String as i32,
515                Some(ValueData::StringValue("meln1ks".into())),
516            ),
517            (
518                ColumnDataType::String as i32,
519                Some(ValueData::StringValue("PATCH".into())),
520            ),
521            (
522                ColumnDataType::String as i32,
523                Some(ValueData::StringValue(
524                    "/observability/metrics/production".into(),
525                )),
526            ),
527            (
528                ColumnDataType::String as i32,
529                Some(ValueData::StringValue("HTTP/1.0".into())),
530            ),
531            (
532                ColumnDataType::Uint16 as i32,
533                Some(ValueData::U16Value(501)),
534            ),
535            (
536                ColumnDataType::Uint32 as i32,
537                Some(ValueData::U32Value(33085)),
538            ),
539            (
540                ColumnDataType::TimestampNanosecond as i32,
541                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
542            ),
543        ];
544        // manually set schema
545        let schema = pipeline.schemas().unwrap();
546        for i in 0..schema.len() {
547            let schema = &schema[i];
548            let value = &result.0.values[i];
549            assert_eq!(schema.datatype, test[i].0);
550            assert_eq!(value.value_data, test[i].1);
551        }
552    }
553
554    #[test]
555    fn test_csv_pipeline() {
556        let input_value_str = r#"
557                    {
558                        "my_field": "1,2",
559                        "foo": "bar",
560                        "ts": "1"
561                    }
562                "#;
563        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
564
565        let pipeline_yaml = r#"
566    description: Pipeline for Apache Tomcat
567    processors:
568      - csv:
569          field: my_field
570          target_fields: field1, field2
571      - epoch:
572          field: ts
573          resolution: ns
574    transform:
575      - field: field1
576        type: uint32
577      - field: field2
578        type: uint32
579      - field: ts
580        type: timestamp, ns
581        index: time
582    "#;
583
584        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
585        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
586        let pipeline_ctx = PipelineContext::new(
587            &pipeline_def,
588            &pipeline_param,
589            session::context::Channel::Unknown,
590        );
591
592        let payload = input_value.into();
593        let result = pipeline
594            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
595            .unwrap()
596            .into_transformed()
597            .unwrap();
598        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
599        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
600        match &result.0.values[2].value_data {
601            Some(ValueData::TimestampNanosecondValue(v)) => {
602                assert_ne!(v, &0);
603            }
604            _ => panic!("expect null value"),
605        }
606    }
607
608    #[test]
609    fn test_date_pipeline() {
610        let input_value_str = r#"
611                {
612                    "my_field": "1,2",
613                    "foo": "bar",
614                    "test_time": "2014-5-17T04:34:56+00:00"
615                }
616            "#;
617        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
618
619        let pipeline_yaml = r#"---
620description: Pipeline for Apache Tomcat
621
622processors:
623    - date:
624        field: test_time
625
626transform:
627    - field: test_time
628      type: timestamp, ns
629      index: time
630    "#;
631
632        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
633        let pipeline = Arc::new(pipeline);
634        let schema = pipeline.schemas().unwrap();
635        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
636
637        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
638        let pipeline_param = crate::GreptimePipelineParams::default();
639        let pipeline_ctx = PipelineContext::new(
640            &pipeline_def,
641            &pipeline_param,
642            session::context::Channel::Unknown,
643        );
644        let schema = pipeline.schemas().unwrap().clone();
645        let result = input_value.into();
646
647        let row = pipeline
648            .exec_mut(result, &pipeline_ctx, &mut schema_info)
649            .unwrap()
650            .into_transformed()
651            .unwrap();
652        let output = Rows {
653            schema,
654            rows: vec![row.0],
655        };
656        let schemas = output.schema;
657
658        assert_eq!(schemas.len(), 1);
659        let schema = schemas[0].clone();
660        assert_eq!("test_time", schema.column_name);
661        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
662        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
663
664        let row = output.rows[0].clone();
665        assert_eq!(1, row.values.len());
666        let value_data = row.values[0].clone().value_data;
667        assert_eq!(
668            Some(v1::value::ValueData::TimestampNanosecondValue(
669                1400301296000000000
670            )),
671            value_data
672        );
673    }
674
675    #[test]
676    fn test_dispatcher() {
677        let pipeline_yaml = r#"
678---
679description: Pipeline for Apache Tomcat
680
681processors:
682  - epoch:
683      field: ts
684      resolution: ns
685
686dispatcher:
687  field: typename
688  rules:
689    - value: http
690      table_suffix: http_events
691    - value: database
692      table_suffix: db_events
693      pipeline: database_pipeline
694
695transform:
696  - field: typename
697    type: string
698  - field: ts
699    type: timestamp, ns
700    index: time
701"#;
702        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
703        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
704        assert_eq!(dispatcher.field, "typename");
705
706        assert_eq!(dispatcher.rules.len(), 2);
707
708        assert_eq!(
709            dispatcher.rules[0],
710            crate::dispatcher::Rule {
711                value: VrlValue::Bytes(Bytes::from("http")),
712                table_suffix: "http_events".to_string(),
713                pipeline: None
714            }
715        );
716
717        assert_eq!(
718            dispatcher.rules[1],
719            crate::dispatcher::Rule {
720                value: VrlValue::Bytes(Bytes::from("database")),
721                table_suffix: "db_events".to_string(),
722                pipeline: Some("database_pipeline".to_string()),
723            }
724        );
725
726        let bad_yaml1 = r#"
727---
728description: Pipeline for Apache Tomcat
729
730processors:
731  - epoch:
732      field: ts
733      resolution: ns
734
735dispatcher:
736  _field: typename
737  rules:
738    - value: http
739      table_suffix: http_events
740    - value: database
741      table_suffix: db_events
742      pipeline: database_pipeline
743
744transform:
745  - field: typename
746    type: string
747  - field: ts
748    type: timestamp, ns
749    index: time
750"#;
751        let bad_yaml2 = r#"
752---
753description: Pipeline for Apache Tomcat
754
755processors:
756  - epoch:
757      field: ts
758      resolution: ns
759dispatcher:
760  field: typename
761  rules:
762    - value: http
763      _table_suffix: http_events
764    - value: database
765      _table_suffix: db_events
766      pipeline: database_pipeline
767
768transform:
769  - field: typename
770    type: string
771  - field: ts
772    type: timestamp, ns
773    index: time
774"#;
775        let bad_yaml3 = r#"
776---
777description: Pipeline for Apache Tomcat
778
779processors:
780  - epoch:
781      field: ts
782      resolution: ns
783dispatcher:
784  field: typename
785  rules:
786    - _value: http
787      table_suffix: http_events
788    - _value: database
789      table_suffix: db_events
790      pipeline: database_pipeline
791
792transform:
793  - field: typename
794    type: string
795  - field: ts
796    type: timestamp, ns
797    index: time
798"#;
799
800        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
801        assert!(r.is_err());
802        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
803        assert!(r.is_err());
804        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
805        assert!(r.is_err());
806    }
807}