1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use itertools::Itertools;
25use processor::{Processor, Processors};
26use snafu::{OptionExt, ResultExt, ensure};
27use transform::Transforms;
28use vrl::core::Value as VrlValue;
29use yaml_rust::{Yaml, YamlLoader};
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33 AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu,
34 Result, YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::processor::ProcessorKind;
37use crate::etl::transform::transformer::greptime::values_to_row;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
40
41const DESCRIPTION: &str = "description";
42const DOC_VERSION: &str = "version";
43const PROCESSORS: &str = "processors";
44const TRANSFORM: &str = "transform";
45const TRANSFORMS: &str = "transforms";
46const DISPATCHER: &str = "dispatcher";
47const TABLESUFFIX: &str = "table_suffix";
48
49pub enum Content<'a> {
50 Json(&'a str),
51 Yaml(&'a str),
52}
53
54pub fn parse(input: &Content) -> Result<Pipeline> {
55 match input {
56 Content::Yaml(str) => {
57 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
58
59 ensure!(docs.len() == 1, YamlParseSnafu);
60
61 let doc = &docs[0];
62
63 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
64
65 let doc_version = (&doc[DOC_VERSION]).try_into()?;
66
67 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
68 v.try_into()?
69 } else {
70 Processors::default()
71 };
72
73 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
74 {
75 v.try_into()?
76 } else {
77 Transforms::default()
78 };
79
80 let transformer = if transformers.is_empty() {
81 let cnt = processors
84 .iter()
85 .filter_map(|p| match p {
86 ProcessorKind::Date(d) if !d.ignore_missing() => Some(
87 d.fields
88 .iter()
89 .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
90 .collect_vec(),
91 ),
92 ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
93 e.fields
94 .iter()
95 .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
96 .collect_vec(),
97 ),
98 _ => None,
99 })
100 .flatten()
101 .collect_vec();
102 ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
103
104 let (ts_name, timeunit) = cnt.first().unwrap();
105 TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
106 } else {
107 TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
108 transformers,
109 &doc_version,
110 )?)
111 };
112
113 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
114 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
115 } else {
116 None
117 };
118
119 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
120 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
121 } else {
122 None
123 };
124
125 Ok(Pipeline {
126 doc_version,
127 description,
128 processors,
129 transformer,
130 dispatcher,
131 tablesuffix,
132 })
133 }
134 Content::Json(_) => unimplemented!(),
135 }
136}
137
138#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
139pub enum PipelineDocVersion {
140 #[default]
143 V1,
144
145 V2,
152}
153
154impl TryFrom<&Yaml> for PipelineDocVersion {
155 type Error = Error;
156
157 fn try_from(value: &Yaml) -> Result<Self> {
158 if value.is_badvalue() || value.is_null() {
159 return Ok(PipelineDocVersion::V1);
160 }
161
162 let version = match value {
163 Yaml::String(s) => s
164 .parse::<i64>()
165 .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
166 Yaml::Integer(i) => *i,
167 _ => {
168 return InvalidVersionNumberSnafu {
169 version: value.as_str().unwrap_or_default().to_string(),
170 }
171 .fail();
172 }
173 };
174
175 match version {
176 1 => Ok(PipelineDocVersion::V1),
177 2 => Ok(PipelineDocVersion::V2),
178 _ => InvalidVersionNumberSnafu {
179 version: version.to_string(),
180 }
181 .fail(),
182 }
183 }
184}
185
186#[derive(Debug)]
187pub struct Pipeline {
188 doc_version: PipelineDocVersion,
189 description: Option<String>,
190 processors: processor::Processors,
191 dispatcher: Option<Dispatcher>,
192 transformer: TransformerMode,
193 tablesuffix: Option<TableSuffixTemplate>,
194}
195
196#[derive(Debug, Clone)]
197pub enum TransformerMode {
198 GreptimeTransformer(GreptimeTransformer),
199 AutoTransform(String, TimeUnit),
200}
201
202#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
204pub struct DispatchedTo {
205 pub table_suffix: String,
206 pub pipeline: Option<String>,
207}
208
209impl From<&Rule> for DispatchedTo {
210 fn from(value: &Rule) -> Self {
211 DispatchedTo {
212 table_suffix: value.table_suffix.clone(),
213 pipeline: value.pipeline.clone(),
214 }
215 }
216}
217
218impl DispatchedTo {
219 pub fn dispatched_to_table_name(&self, original: &str) -> String {
221 [original, &self.table_suffix].concat()
222 }
223}
224
225#[derive(Debug)]
227pub enum PipelineExecOutput {
228 Transformed(TransformedOutput),
229 DispatchedTo(DispatchedTo, VrlValue),
230 Filtered,
231}
232
233#[derive(Debug)]
234pub struct TransformedOutput {
235 pub opt: ContextOpt,
236 pub row: Row,
237 pub table_suffix: Option<String>,
238}
239
240impl PipelineExecOutput {
241 pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
243 if let Self::Transformed(TransformedOutput {
244 row, table_suffix, ..
245 }) = self
246 {
247 Some((row, table_suffix))
248 } else {
249 None
250 }
251 }
252
253 pub fn into_dispatched(self) -> Option<DispatchedTo> {
255 if let Self::DispatchedTo(d, _) = self {
256 Some(d)
257 } else {
258 None
259 }
260 }
261}
262
263impl Pipeline {
264 fn is_v1(&self) -> bool {
265 self.doc_version == PipelineDocVersion::V1
266 }
267
268 pub fn exec_mut(
269 &self,
270 mut val: VrlValue,
271 pipeline_ctx: &PipelineContext<'_>,
272 schema_info: &mut SchemaInfo,
273 ) -> Result<PipelineExecOutput> {
274 for processor in self.processors.iter() {
276 val = processor.exec_mut(val)?;
277 if val.is_null() {
278 return Ok(PipelineExecOutput::Filtered);
280 }
281 }
282
283 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
285 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
286 }
287
288 let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
291 let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
292
293 let row = match self.transformer() {
294 TransformerMode::GreptimeTransformer(greptime_transformer) => {
295 let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
296 if self.is_v1() {
297 return Ok(PipelineExecOutput::Transformed(TransformedOutput {
300 opt,
301 row: Row { values },
302 table_suffix,
303 }));
304 }
305 values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
308 }
309 TransformerMode::AutoTransform(ts_name, time_unit) => {
310 let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
315 IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
316 ));
317 let n_ctx =
318 PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
319 values_to_row(schema_info, val, &n_ctx, None, true)?
320 }
321 };
322
323 Ok(PipelineExecOutput::Transformed(TransformedOutput {
324 opt,
325 row,
326 table_suffix,
327 }))
328 }
329
330 pub fn processors(&self) -> &processor::Processors {
331 &self.processors
332 }
333
334 pub fn transformer(&self) -> &TransformerMode {
335 &self.transformer
336 }
337
338 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
340 match &self.transformer {
341 TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
342 TransformerMode::AutoTransform(_, _) => None,
343 }
344 }
345
346 pub fn is_variant_table_name(&self) -> bool {
347 self.dispatcher.is_some() || self.tablesuffix.is_some()
350 }
351}
352
353pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
354 intermediate_keys
355 .iter()
356 .position(|k| k == key)
357 .context(IntermediateKeyIndexSnafu { kind, key })
358}
359
360#[macro_export]
369macro_rules! setup_pipeline {
370 ($pipeline:expr) => {{
371 use std::sync::Arc;
372
373 use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
374
375 let pipeline: Arc<Pipeline> = Arc::new($pipeline);
376 let schema = pipeline.schemas().unwrap();
377 let schema_info = SchemaInfo::from_schema_list(schema.clone());
378
379 let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
380 let pipeline_param = GreptimePipelineParams::default();
381
382 (pipeline, schema_info, pipeline_def, pipeline_param)
383 }};
384}
385#[cfg(test)]
386mod tests {
387 use std::collections::BTreeMap;
388 use std::sync::Arc;
389
390 use api::v1::Rows;
391 use greptime_proto::v1::value::ValueData;
392 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
393 use vrl::prelude::Bytes;
394 use vrl::value::KeyString;
395
396 use super::*;
397
398 #[test]
399 fn test_pipeline_prepare() {
400 let input_value_str = r#"
401 {
402 "my_field": "1,2",
403 "foo": "bar",
404 "ts": "1"
405 }
406 "#;
407 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
408
409 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
410processors:
411 - csv:
412 field: my_field
413 target_fields: field1, field2
414 - epoch:
415 field: ts
416 resolution: ns
417transform:
418 - field: field1
419 type: uint32
420 - field: field2
421 type: uint32
422 - field: ts
423 type: timestamp, ns
424 index: time
425 "#;
426
427 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
428 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
429 let pipeline_ctx = PipelineContext::new(
430 &pipeline_def,
431 &pipeline_param,
432 session::context::Channel::Unknown,
433 );
434
435 let payload = input_value.into();
436 let result = pipeline
437 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
438 .unwrap()
439 .into_transformed()
440 .unwrap();
441
442 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
443 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
444 match &result.0.values[2].value_data {
445 Some(ValueData::TimestampNanosecondValue(v)) => {
446 assert_ne!(v, &0);
447 }
448 _ => panic!("expect null value"),
449 }
450 }
451
452 #[test]
453 fn test_dissect_pipeline() {
454 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
455 let pipeline_str = r#"processors:
456 - dissect:
457 fields:
458 - message
459 patterns:
460 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
461 - date:
462 fields:
463 - ts
464 formats:
465 - "%d/%b/%Y:%H:%M:%S %z"
466
467transform:
468 - fields:
469 - ip
470 - username
471 - method
472 - path
473 - proto
474 type: string
475 - fields:
476 - status
477 type: uint16
478 - fields:
479 - bytes
480 type: uint32
481 - field: ts
482 type: timestamp, ns
483 index: time"#;
484 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
485 let pipeline = Arc::new(pipeline);
486 let schema = pipeline.schemas().unwrap();
487 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
488
489 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
490 let pipeline_param = crate::GreptimePipelineParams::default();
491 let pipeline_ctx = PipelineContext::new(
492 &pipeline_def,
493 &pipeline_param,
494 session::context::Channel::Unknown,
495 );
496 let payload = VrlValue::Object(BTreeMap::from([(
497 KeyString::from("message"),
498 VrlValue::Bytes(Bytes::from(message)),
499 )]));
500
501 let result = pipeline
502 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
503 .unwrap()
504 .into_transformed()
505 .unwrap();
506
507 assert_eq!(schema_info.schema.len(), result.0.values.len());
508 let test = vec![
509 (
510 ColumnDataType::String as i32,
511 Some(ValueData::StringValue("129.37.245.88".into())),
512 ),
513 (
514 ColumnDataType::String as i32,
515 Some(ValueData::StringValue("meln1ks".into())),
516 ),
517 (
518 ColumnDataType::String as i32,
519 Some(ValueData::StringValue("PATCH".into())),
520 ),
521 (
522 ColumnDataType::String as i32,
523 Some(ValueData::StringValue(
524 "/observability/metrics/production".into(),
525 )),
526 ),
527 (
528 ColumnDataType::String as i32,
529 Some(ValueData::StringValue("HTTP/1.0".into())),
530 ),
531 (
532 ColumnDataType::Uint16 as i32,
533 Some(ValueData::U16Value(501)),
534 ),
535 (
536 ColumnDataType::Uint32 as i32,
537 Some(ValueData::U32Value(33085)),
538 ),
539 (
540 ColumnDataType::TimestampNanosecond as i32,
541 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
542 ),
543 ];
544 let schema = pipeline.schemas().unwrap();
546 for i in 0..schema.len() {
547 let schema = &schema[i];
548 let value = &result.0.values[i];
549 assert_eq!(schema.datatype, test[i].0);
550 assert_eq!(value.value_data, test[i].1);
551 }
552 }
553
554 #[test]
555 fn test_csv_pipeline() {
556 let input_value_str = r#"
557 {
558 "my_field": "1,2",
559 "foo": "bar",
560 "ts": "1"
561 }
562 "#;
563 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
564
565 let pipeline_yaml = r#"
566 description: Pipeline for Apache Tomcat
567 processors:
568 - csv:
569 field: my_field
570 target_fields: field1, field2
571 - epoch:
572 field: ts
573 resolution: ns
574 transform:
575 - field: field1
576 type: uint32
577 - field: field2
578 type: uint32
579 - field: ts
580 type: timestamp, ns
581 index: time
582 "#;
583
584 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
585 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
586 let pipeline_ctx = PipelineContext::new(
587 &pipeline_def,
588 &pipeline_param,
589 session::context::Channel::Unknown,
590 );
591
592 let payload = input_value.into();
593 let result = pipeline
594 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
595 .unwrap()
596 .into_transformed()
597 .unwrap();
598 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
599 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
600 match &result.0.values[2].value_data {
601 Some(ValueData::TimestampNanosecondValue(v)) => {
602 assert_ne!(v, &0);
603 }
604 _ => panic!("expect null value"),
605 }
606 }
607
608 #[test]
609 fn test_date_pipeline() {
610 let input_value_str = r#"
611 {
612 "my_field": "1,2",
613 "foo": "bar",
614 "test_time": "2014-5-17T04:34:56+00:00"
615 }
616 "#;
617 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
618
619 let pipeline_yaml = r#"---
620description: Pipeline for Apache Tomcat
621
622processors:
623 - date:
624 field: test_time
625
626transform:
627 - field: test_time
628 type: timestamp, ns
629 index: time
630 "#;
631
632 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
633 let pipeline = Arc::new(pipeline);
634 let schema = pipeline.schemas().unwrap();
635 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
636
637 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
638 let pipeline_param = crate::GreptimePipelineParams::default();
639 let pipeline_ctx = PipelineContext::new(
640 &pipeline_def,
641 &pipeline_param,
642 session::context::Channel::Unknown,
643 );
644 let schema = pipeline.schemas().unwrap().clone();
645 let result = input_value.into();
646
647 let row = pipeline
648 .exec_mut(result, &pipeline_ctx, &mut schema_info)
649 .unwrap()
650 .into_transformed()
651 .unwrap();
652 let output = Rows {
653 schema,
654 rows: vec![row.0],
655 };
656 let schemas = output.schema;
657
658 assert_eq!(schemas.len(), 1);
659 let schema = schemas[0].clone();
660 assert_eq!("test_time", schema.column_name);
661 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
662 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
663
664 let row = output.rows[0].clone();
665 assert_eq!(1, row.values.len());
666 let value_data = row.values[0].clone().value_data;
667 assert_eq!(
668 Some(v1::value::ValueData::TimestampNanosecondValue(
669 1400301296000000000
670 )),
671 value_data
672 );
673 }
674
675 #[test]
676 fn test_dispatcher() {
677 let pipeline_yaml = r#"
678---
679description: Pipeline for Apache Tomcat
680
681processors:
682 - epoch:
683 field: ts
684 resolution: ns
685
686dispatcher:
687 field: typename
688 rules:
689 - value: http
690 table_suffix: http_events
691 - value: database
692 table_suffix: db_events
693 pipeline: database_pipeline
694
695transform:
696 - field: typename
697 type: string
698 - field: ts
699 type: timestamp, ns
700 index: time
701"#;
702 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
703 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
704 assert_eq!(dispatcher.field, "typename");
705
706 assert_eq!(dispatcher.rules.len(), 2);
707
708 assert_eq!(
709 dispatcher.rules[0],
710 crate::dispatcher::Rule {
711 value: VrlValue::Bytes(Bytes::from("http")),
712 table_suffix: "http_events".to_string(),
713 pipeline: None
714 }
715 );
716
717 assert_eq!(
718 dispatcher.rules[1],
719 crate::dispatcher::Rule {
720 value: VrlValue::Bytes(Bytes::from("database")),
721 table_suffix: "db_events".to_string(),
722 pipeline: Some("database_pipeline".to_string()),
723 }
724 );
725
726 let bad_yaml1 = r#"
727---
728description: Pipeline for Apache Tomcat
729
730processors:
731 - epoch:
732 field: ts
733 resolution: ns
734
735dispatcher:
736 _field: typename
737 rules:
738 - value: http
739 table_suffix: http_events
740 - value: database
741 table_suffix: db_events
742 pipeline: database_pipeline
743
744transform:
745 - field: typename
746 type: string
747 - field: ts
748 type: timestamp, ns
749 index: time
750"#;
751 let bad_yaml2 = r#"
752---
753description: Pipeline for Apache Tomcat
754
755processors:
756 - epoch:
757 field: ts
758 resolution: ns
759dispatcher:
760 field: typename
761 rules:
762 - value: http
763 _table_suffix: http_events
764 - value: database
765 _table_suffix: db_events
766 pipeline: database_pipeline
767
768transform:
769 - field: typename
770 type: string
771 - field: ts
772 type: timestamp, ns
773 index: time
774"#;
775 let bad_yaml3 = r#"
776---
777description: Pipeline for Apache Tomcat
778
779processors:
780 - epoch:
781 field: ts
782 resolution: ns
783dispatcher:
784 field: typename
785 rules:
786 - _value: http
787 table_suffix: http_events
788 - _value: database
789 table_suffix: db_events
790 pipeline: database_pipeline
791
792transform:
793 - field: typename
794 type: string
795 - field: ts
796 type: timestamp, ns
797 index: time
798"#;
799
800 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
801 assert!(r.is_err());
802 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
803 assert!(r.is_err());
804 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
805 assert!(r.is_err());
806 }
807}