pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use itertools::Itertools;
25use processor::{Processor, Processors};
26use snafu::{ensure, OptionExt, ResultExt};
27use transform::Transforms;
28use vrl::core::Value as VrlValue;
29use yaml_rust::{Yaml, YamlLoader};
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33    AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu,
34    Result, YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::processor::ProcessorKind;
37use crate::etl::transform::transformer::greptime::values_to_row;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
40
41const DESCRIPTION: &str = "description";
42const DOC_VERSION: &str = "version";
43const PROCESSORS: &str = "processors";
44const TRANSFORM: &str = "transform";
45const TRANSFORMS: &str = "transforms";
46const DISPATCHER: &str = "dispatcher";
47const TABLESUFFIX: &str = "table_suffix";
48
49pub enum Content<'a> {
50    Json(&'a str),
51    Yaml(&'a str),
52}
53
54pub fn parse(input: &Content) -> Result<Pipeline> {
55    match input {
56        Content::Yaml(str) => {
57            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
58
59            ensure!(docs.len() == 1, YamlParseSnafu);
60
61            let doc = &docs[0];
62
63            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
64
65            let doc_version = (&doc[DOC_VERSION]).try_into()?;
66
67            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
68                v.try_into()?
69            } else {
70                Processors::default()
71            };
72
73            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
74            {
75                v.try_into()?
76            } else {
77                Transforms::default()
78            };
79
80            let transformer = if transformers.is_empty() {
81                // use auto transform
82                // check processors have at least one timestamp-related processor
83                let cnt = processors
84                    .iter()
85                    .filter_map(|p| match p {
86                        ProcessorKind::Date(d) if !d.ignore_missing() => Some(
87                            d.fields
88                                .iter()
89                                .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
90                                .collect_vec(),
91                        ),
92                        ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
93                            e.fields
94                                .iter()
95                                .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
96                                .collect_vec(),
97                        ),
98                        _ => None,
99                    })
100                    .flatten()
101                    .collect_vec();
102                ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
103
104                let (ts_name, timeunit) = cnt.first().unwrap();
105                TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
106            } else {
107                TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
108                    transformers,
109                    &doc_version,
110                )?)
111            };
112
113            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
114                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
115            } else {
116                None
117            };
118
119            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
120                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
121            } else {
122                None
123            };
124
125            Ok(Pipeline {
126                doc_version,
127                description,
128                processors,
129                transformer,
130                dispatcher,
131                tablesuffix,
132            })
133        }
134        Content::Json(_) => unimplemented!(),
135    }
136}
137
138#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
139pub enum PipelineDocVersion {
140    /// 1. All fields meant to be preserved have to explicitly set in the transform section.
141    /// 2. Or no transform is set, then the auto-transform will be used.
142    #[default]
143    V1,
144
145    /// A combination of transform and auto-transform.
146    /// First it goes through the transform section,
147    /// then use auto-transform to set the rest fields.
148    ///
149    /// This is useful if you only want to set the index field,
150    /// and let the normal fields be auto-inferred.
151    V2,
152}
153
154impl TryFrom<&Yaml> for PipelineDocVersion {
155    type Error = Error;
156
157    fn try_from(value: &Yaml) -> Result<Self> {
158        if value.is_badvalue() || value.is_null() {
159            return Ok(PipelineDocVersion::V1);
160        }
161
162        let version = match value {
163            Yaml::String(s) => s
164                .parse::<i64>()
165                .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
166            Yaml::Integer(i) => *i,
167            _ => {
168                return InvalidVersionNumberSnafu {
169                    version: value.as_str().unwrap_or_default().to_string(),
170                }
171                .fail();
172            }
173        };
174
175        match version {
176            1 => Ok(PipelineDocVersion::V1),
177            2 => Ok(PipelineDocVersion::V2),
178            _ => InvalidVersionNumberSnafu {
179                version: version.to_string(),
180            }
181            .fail(),
182        }
183    }
184}
185
186#[derive(Debug)]
187pub struct Pipeline {
188    doc_version: PipelineDocVersion,
189    description: Option<String>,
190    processors: processor::Processors,
191    dispatcher: Option<Dispatcher>,
192    transformer: TransformerMode,
193    tablesuffix: Option<TableSuffixTemplate>,
194}
195
196#[derive(Debug, Clone)]
197pub enum TransformerMode {
198    GreptimeTransformer(GreptimeTransformer),
199    AutoTransform(String, TimeUnit),
200}
201
202/// Where the pipeline executed is dispatched to, with context information
203#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
204pub struct DispatchedTo {
205    pub table_suffix: String,
206    pub pipeline: Option<String>,
207}
208
209impl From<&Rule> for DispatchedTo {
210    fn from(value: &Rule) -> Self {
211        DispatchedTo {
212            table_suffix: value.table_suffix.clone(),
213            pipeline: value.pipeline.clone(),
214        }
215    }
216}
217
218impl DispatchedTo {
219    /// Generate destination table name from input
220    pub fn dispatched_to_table_name(&self, original: &str) -> String {
221        format!("{}_{}", &original, self.table_suffix)
222    }
223}
224
225/// The result of pipeline execution
226#[derive(Debug)]
227pub enum PipelineExecOutput {
228    Transformed(TransformedOutput),
229    DispatchedTo(DispatchedTo, VrlValue),
230    Filtered,
231}
232
233#[derive(Debug)]
234pub struct TransformedOutput {
235    pub opt: ContextOpt,
236    pub row: Row,
237    pub table_suffix: Option<String>,
238}
239
240impl PipelineExecOutput {
241    // Note: This is a test only function, do not use it in production.
242    pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
243        if let Self::Transformed(TransformedOutput {
244            row, table_suffix, ..
245        }) = self
246        {
247            Some((row, table_suffix))
248        } else {
249            None
250        }
251    }
252
253    // Note: This is a test only function, do not use it in production.
254    pub fn into_dispatched(self) -> Option<DispatchedTo> {
255        if let Self::DispatchedTo(d, _) = self {
256            Some(d)
257        } else {
258            None
259        }
260    }
261}
262
263impl Pipeline {
264    fn is_v1(&self) -> bool {
265        self.doc_version == PipelineDocVersion::V1
266    }
267
268    pub fn exec_mut(
269        &self,
270        mut val: VrlValue,
271        pipeline_ctx: &PipelineContext<'_>,
272        schema_info: &mut SchemaInfo,
273    ) -> Result<PipelineExecOutput> {
274        // process
275        for processor in self.processors.iter() {
276            val = processor.exec_mut(val)?;
277            if val.is_null() {
278                // line is filtered
279                return Ok(PipelineExecOutput::Filtered);
280            }
281        }
282
283        // dispatch, fast return if matched
284        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
285            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
286        }
287
288        // extract the options first
289        // this might be a breaking change, for table_suffix is now right after the processors
290        let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
291        let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
292
293        let row = match self.transformer() {
294            TransformerMode::GreptimeTransformer(greptime_transformer) => {
295                let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
296                if self.is_v1() {
297                    // v1 dont combine with auto-transform
298                    // so return immediately
299                    return Ok(PipelineExecOutput::Transformed(TransformedOutput {
300                        opt,
301                        row: Row { values },
302                        table_suffix,
303                    }));
304                }
305                // continue v2 process, and set the rest fields with auto-transform
306                // if transformer presents, then ts has been set
307                values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
308            }
309            TransformerMode::AutoTransform(ts_name, time_unit) => {
310                // infer ts from the context
311                // we've check that only one timestamp should exist
312
313                // Create pipeline context with the found timestamp
314                let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
315                    IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
316                ));
317                let n_ctx =
318                    PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
319                values_to_row(schema_info, val, &n_ctx, None, true)?
320            }
321        };
322
323        Ok(PipelineExecOutput::Transformed(TransformedOutput {
324            opt,
325            row,
326            table_suffix,
327        }))
328    }
329
330    pub fn processors(&self) -> &processor::Processors {
331        &self.processors
332    }
333
334    pub fn transformer(&self) -> &TransformerMode {
335        &self.transformer
336    }
337
338    // the method is for test purpose
339    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
340        match &self.transformer {
341            TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
342            TransformerMode::AutoTransform(_, _) => None,
343        }
344    }
345}
346
347pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
348    intermediate_keys
349        .iter()
350        .position(|k| k == key)
351        .context(IntermediateKeyIndexSnafu { kind, key })
352}
353
354/// This macro is test only, do not use it in production.
355/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
356///
357/// Usage:
358/// ```rust
359/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
360/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
361/// ```
362#[macro_export]
363macro_rules! setup_pipeline {
364    ($pipeline:expr) => {{
365        use std::sync::Arc;
366
367        use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
368
369        let pipeline: Arc<Pipeline> = Arc::new($pipeline);
370        let schema = pipeline.schemas().unwrap();
371        let schema_info = SchemaInfo::from_schema_list(schema.clone());
372
373        let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
374        let pipeline_param = GreptimePipelineParams::default();
375
376        (pipeline, schema_info, pipeline_def, pipeline_param)
377    }};
378}
379#[cfg(test)]
380mod tests {
381    use std::collections::BTreeMap;
382    use std::sync::Arc;
383
384    use api::v1::Rows;
385    use greptime_proto::v1::value::ValueData;
386    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
387    use vrl::prelude::Bytes;
388    use vrl::value::KeyString;
389
390    use super::*;
391
392    #[test]
393    fn test_pipeline_prepare() {
394        let input_value_str = r#"
395                    {
396                        "my_field": "1,2",
397                        "foo": "bar",
398                        "ts": "1"
399                    }
400                "#;
401        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
402
403        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
404processors:
405    - csv:
406        field: my_field
407        target_fields: field1, field2
408    - epoch:
409        field: ts
410        resolution: ns
411transform:
412    - field: field1
413      type: uint32
414    - field: field2
415      type: uint32
416    - field: ts
417      type: timestamp, ns
418      index: time
419    "#;
420
421        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
422        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
423        let pipeline_ctx = PipelineContext::new(
424            &pipeline_def,
425            &pipeline_param,
426            session::context::Channel::Unknown,
427        );
428
429        let payload = input_value.into();
430        let result = pipeline
431            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
432            .unwrap()
433            .into_transformed()
434            .unwrap();
435
436        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
437        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
438        match &result.0.values[2].value_data {
439            Some(ValueData::TimestampNanosecondValue(v)) => {
440                assert_ne!(v, &0);
441            }
442            _ => panic!("expect null value"),
443        }
444    }
445
446    #[test]
447    fn test_dissect_pipeline() {
448        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
449        let pipeline_str = r#"processors:
450    - dissect:
451        fields:
452          - message
453        patterns:
454          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
455    - date:
456        fields:
457          - ts
458        formats:
459          - "%d/%b/%Y:%H:%M:%S %z"
460
461transform:
462    - fields:
463        - ip
464        - username
465        - method
466        - path
467        - proto
468      type: string
469    - fields:
470        - status
471      type: uint16
472    - fields:
473        - bytes
474      type: uint32
475    - field: ts
476      type: timestamp, ns
477      index: time"#;
478        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
479        let pipeline = Arc::new(pipeline);
480        let schema = pipeline.schemas().unwrap();
481        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
482
483        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
484        let pipeline_param = crate::GreptimePipelineParams::default();
485        let pipeline_ctx = PipelineContext::new(
486            &pipeline_def,
487            &pipeline_param,
488            session::context::Channel::Unknown,
489        );
490        let payload = VrlValue::Object(BTreeMap::from([(
491            KeyString::from("message"),
492            VrlValue::Bytes(Bytes::from(message)),
493        )]));
494
495        let result = pipeline
496            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
497            .unwrap()
498            .into_transformed()
499            .unwrap();
500
501        assert_eq!(schema_info.schema.len(), result.0.values.len());
502        let test = vec![
503            (
504                ColumnDataType::String as i32,
505                Some(ValueData::StringValue("129.37.245.88".into())),
506            ),
507            (
508                ColumnDataType::String as i32,
509                Some(ValueData::StringValue("meln1ks".into())),
510            ),
511            (
512                ColumnDataType::String as i32,
513                Some(ValueData::StringValue("PATCH".into())),
514            ),
515            (
516                ColumnDataType::String as i32,
517                Some(ValueData::StringValue(
518                    "/observability/metrics/production".into(),
519                )),
520            ),
521            (
522                ColumnDataType::String as i32,
523                Some(ValueData::StringValue("HTTP/1.0".into())),
524            ),
525            (
526                ColumnDataType::Uint16 as i32,
527                Some(ValueData::U16Value(501)),
528            ),
529            (
530                ColumnDataType::Uint32 as i32,
531                Some(ValueData::U32Value(33085)),
532            ),
533            (
534                ColumnDataType::TimestampNanosecond as i32,
535                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
536            ),
537        ];
538        // manually set schema
539        let schema = pipeline.schemas().unwrap();
540        for i in 0..schema.len() {
541            let schema = &schema[i];
542            let value = &result.0.values[i];
543            assert_eq!(schema.datatype, test[i].0);
544            assert_eq!(value.value_data, test[i].1);
545        }
546    }
547
548    #[test]
549    fn test_csv_pipeline() {
550        let input_value_str = r#"
551                    {
552                        "my_field": "1,2",
553                        "foo": "bar",
554                        "ts": "1"
555                    }
556                "#;
557        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
558
559        let pipeline_yaml = r#"
560    description: Pipeline for Apache Tomcat
561    processors:
562      - csv:
563          field: my_field
564          target_fields: field1, field2
565      - epoch:
566          field: ts
567          resolution: ns
568    transform:
569      - field: field1
570        type: uint32
571      - field: field2
572        type: uint32
573      - field: ts
574        type: timestamp, ns
575        index: time
576    "#;
577
578        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
579        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
580        let pipeline_ctx = PipelineContext::new(
581            &pipeline_def,
582            &pipeline_param,
583            session::context::Channel::Unknown,
584        );
585
586        let payload = input_value.into();
587        let result = pipeline
588            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
589            .unwrap()
590            .into_transformed()
591            .unwrap();
592        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
593        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
594        match &result.0.values[2].value_data {
595            Some(ValueData::TimestampNanosecondValue(v)) => {
596                assert_ne!(v, &0);
597            }
598            _ => panic!("expect null value"),
599        }
600    }
601
602    #[test]
603    fn test_date_pipeline() {
604        let input_value_str = r#"
605                {
606                    "my_field": "1,2",
607                    "foo": "bar",
608                    "test_time": "2014-5-17T04:34:56+00:00"
609                }
610            "#;
611        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
612
613        let pipeline_yaml = r#"---
614description: Pipeline for Apache Tomcat
615
616processors:
617    - date:
618        field: test_time
619
620transform:
621    - field: test_time
622      type: timestamp, ns
623      index: time
624    "#;
625
626        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
627        let pipeline = Arc::new(pipeline);
628        let schema = pipeline.schemas().unwrap();
629        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
630
631        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
632        let pipeline_param = crate::GreptimePipelineParams::default();
633        let pipeline_ctx = PipelineContext::new(
634            &pipeline_def,
635            &pipeline_param,
636            session::context::Channel::Unknown,
637        );
638        let schema = pipeline.schemas().unwrap().clone();
639        let result = input_value.into();
640
641        let row = pipeline
642            .exec_mut(result, &pipeline_ctx, &mut schema_info)
643            .unwrap()
644            .into_transformed()
645            .unwrap();
646        let output = Rows {
647            schema,
648            rows: vec![row.0],
649        };
650        let schemas = output.schema;
651
652        assert_eq!(schemas.len(), 1);
653        let schema = schemas[0].clone();
654        assert_eq!("test_time", schema.column_name);
655        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
656        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
657
658        let row = output.rows[0].clone();
659        assert_eq!(1, row.values.len());
660        let value_data = row.values[0].clone().value_data;
661        assert_eq!(
662            Some(v1::value::ValueData::TimestampNanosecondValue(
663                1400301296000000000
664            )),
665            value_data
666        );
667    }
668
669    #[test]
670    fn test_dispatcher() {
671        let pipeline_yaml = r#"
672---
673description: Pipeline for Apache Tomcat
674
675processors:
676  - epoch:
677      field: ts
678      resolution: ns
679
680dispatcher:
681  field: typename
682  rules:
683    - value: http
684      table_suffix: http_events
685    - value: database
686      table_suffix: db_events
687      pipeline: database_pipeline
688
689transform:
690  - field: typename
691    type: string
692  - field: ts
693    type: timestamp, ns
694    index: time
695"#;
696        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
697        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
698        assert_eq!(dispatcher.field, "typename");
699
700        assert_eq!(dispatcher.rules.len(), 2);
701
702        assert_eq!(
703            dispatcher.rules[0],
704            crate::dispatcher::Rule {
705                value: VrlValue::Bytes(Bytes::from("http")),
706                table_suffix: "http_events".to_string(),
707                pipeline: None
708            }
709        );
710
711        assert_eq!(
712            dispatcher.rules[1],
713            crate::dispatcher::Rule {
714                value: VrlValue::Bytes(Bytes::from("database")),
715                table_suffix: "db_events".to_string(),
716                pipeline: Some("database_pipeline".to_string()),
717            }
718        );
719
720        let bad_yaml1 = r#"
721---
722description: Pipeline for Apache Tomcat
723
724processors:
725  - epoch:
726      field: ts
727      resolution: ns
728
729dispatcher:
730  _field: typename
731  rules:
732    - value: http
733      table_suffix: http_events
734    - value: database
735      table_suffix: db_events
736      pipeline: database_pipeline
737
738transform:
739  - field: typename
740    type: string
741  - field: ts
742    type: timestamp, ns
743    index: time
744"#;
745        let bad_yaml2 = r#"
746---
747description: Pipeline for Apache Tomcat
748
749processors:
750  - epoch:
751      field: ts
752      resolution: ns
753dispatcher:
754  field: typename
755  rules:
756    - value: http
757      _table_suffix: http_events
758    - value: database
759      _table_suffix: db_events
760      pipeline: database_pipeline
761
762transform:
763  - field: typename
764    type: string
765  - field: ts
766    type: timestamp, ns
767    index: time
768"#;
769        let bad_yaml3 = r#"
770---
771description: Pipeline for Apache Tomcat
772
773processors:
774  - epoch:
775      field: ts
776      resolution: ns
777dispatcher:
778  field: typename
779  rules:
780    - _value: http
781      table_suffix: http_events
782    - _value: database
783      table_suffix: db_events
784      pipeline: database_pipeline
785
786transform:
787  - field: typename
788    type: string
789  - field: ts
790    type: timestamp, ns
791    index: time
792"#;
793
794        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
795        assert!(r.is_err());
796        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
797        assert!(r.is_err());
798        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
799        assert!(r.is_err());
800    }
801}