1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use itertools::Itertools;
25use processor::{Processor, Processors};
26use snafu::{ensure, OptionExt, ResultExt};
27use transform::Transforms;
28use vrl::core::Value as VrlValue;
29use yaml_rust::{Yaml, YamlLoader};
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33 AutoTransformOneTimestampSnafu, Error, IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu,
34 Result, YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::processor::ProcessorKind;
37use crate::etl::transform::transformer::greptime::values_to_row;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
40
41const DESCRIPTION: &str = "description";
42const DOC_VERSION: &str = "version";
43const PROCESSORS: &str = "processors";
44const TRANSFORM: &str = "transform";
45const TRANSFORMS: &str = "transforms";
46const DISPATCHER: &str = "dispatcher";
47const TABLESUFFIX: &str = "table_suffix";
48
49pub enum Content<'a> {
50 Json(&'a str),
51 Yaml(&'a str),
52}
53
54pub fn parse(input: &Content) -> Result<Pipeline> {
55 match input {
56 Content::Yaml(str) => {
57 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
58
59 ensure!(docs.len() == 1, YamlParseSnafu);
60
61 let doc = &docs[0];
62
63 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
64
65 let doc_version = (&doc[DOC_VERSION]).try_into()?;
66
67 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
68 v.try_into()?
69 } else {
70 Processors::default()
71 };
72
73 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
74 {
75 v.try_into()?
76 } else {
77 Transforms::default()
78 };
79
80 let transformer = if transformers.is_empty() {
81 let cnt = processors
84 .iter()
85 .filter_map(|p| match p {
86 ProcessorKind::Date(d) if !d.ignore_missing() => Some(
87 d.fields
88 .iter()
89 .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
90 .collect_vec(),
91 ),
92 ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
93 e.fields
94 .iter()
95 .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
96 .collect_vec(),
97 ),
98 _ => None,
99 })
100 .flatten()
101 .collect_vec();
102 ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
103
104 let (ts_name, timeunit) = cnt.first().unwrap();
105 TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
106 } else {
107 TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
108 transformers,
109 &doc_version,
110 )?)
111 };
112
113 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
114 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
115 } else {
116 None
117 };
118
119 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
120 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
121 } else {
122 None
123 };
124
125 Ok(Pipeline {
126 doc_version,
127 description,
128 processors,
129 transformer,
130 dispatcher,
131 tablesuffix,
132 })
133 }
134 Content::Json(_) => unimplemented!(),
135 }
136}
137
138#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
139pub enum PipelineDocVersion {
140 #[default]
143 V1,
144
145 V2,
152}
153
154impl TryFrom<&Yaml> for PipelineDocVersion {
155 type Error = Error;
156
157 fn try_from(value: &Yaml) -> Result<Self> {
158 if value.is_badvalue() || value.is_null() {
159 return Ok(PipelineDocVersion::V1);
160 }
161
162 let version = match value {
163 Yaml::String(s) => s
164 .parse::<i64>()
165 .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
166 Yaml::Integer(i) => *i,
167 _ => {
168 return InvalidVersionNumberSnafu {
169 version: value.as_str().unwrap_or_default().to_string(),
170 }
171 .fail();
172 }
173 };
174
175 match version {
176 1 => Ok(PipelineDocVersion::V1),
177 2 => Ok(PipelineDocVersion::V2),
178 _ => InvalidVersionNumberSnafu {
179 version: version.to_string(),
180 }
181 .fail(),
182 }
183 }
184}
185
186#[derive(Debug)]
187pub struct Pipeline {
188 doc_version: PipelineDocVersion,
189 description: Option<String>,
190 processors: processor::Processors,
191 dispatcher: Option<Dispatcher>,
192 transformer: TransformerMode,
193 tablesuffix: Option<TableSuffixTemplate>,
194}
195
196#[derive(Debug, Clone)]
197pub enum TransformerMode {
198 GreptimeTransformer(GreptimeTransformer),
199 AutoTransform(String, TimeUnit),
200}
201
202#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
204pub struct DispatchedTo {
205 pub table_suffix: String,
206 pub pipeline: Option<String>,
207}
208
209impl From<&Rule> for DispatchedTo {
210 fn from(value: &Rule) -> Self {
211 DispatchedTo {
212 table_suffix: value.table_suffix.clone(),
213 pipeline: value.pipeline.clone(),
214 }
215 }
216}
217
218impl DispatchedTo {
219 pub fn dispatched_to_table_name(&self, original: &str) -> String {
221 format!("{}_{}", &original, self.table_suffix)
222 }
223}
224
225#[derive(Debug)]
227pub enum PipelineExecOutput {
228 Transformed(TransformedOutput),
229 DispatchedTo(DispatchedTo, VrlValue),
230 Filtered,
231}
232
233#[derive(Debug)]
234pub struct TransformedOutput {
235 pub opt: ContextOpt,
236 pub row: Row,
237 pub table_suffix: Option<String>,
238}
239
240impl PipelineExecOutput {
241 pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
243 if let Self::Transformed(TransformedOutput {
244 row, table_suffix, ..
245 }) = self
246 {
247 Some((row, table_suffix))
248 } else {
249 None
250 }
251 }
252
253 pub fn into_dispatched(self) -> Option<DispatchedTo> {
255 if let Self::DispatchedTo(d, _) = self {
256 Some(d)
257 } else {
258 None
259 }
260 }
261}
262
263impl Pipeline {
264 fn is_v1(&self) -> bool {
265 self.doc_version == PipelineDocVersion::V1
266 }
267
268 pub fn exec_mut(
269 &self,
270 mut val: VrlValue,
271 pipeline_ctx: &PipelineContext<'_>,
272 schema_info: &mut SchemaInfo,
273 ) -> Result<PipelineExecOutput> {
274 for processor in self.processors.iter() {
276 val = processor.exec_mut(val)?;
277 if val.is_null() {
278 return Ok(PipelineExecOutput::Filtered);
280 }
281 }
282
283 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
285 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
286 }
287
288 let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
291 let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
292
293 let row = match self.transformer() {
294 TransformerMode::GreptimeTransformer(greptime_transformer) => {
295 let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
296 if self.is_v1() {
297 return Ok(PipelineExecOutput::Transformed(TransformedOutput {
300 opt,
301 row: Row { values },
302 table_suffix,
303 }));
304 }
305 values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
308 }
309 TransformerMode::AutoTransform(ts_name, time_unit) => {
310 let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
315 IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
316 ));
317 let n_ctx =
318 PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
319 values_to_row(schema_info, val, &n_ctx, None, true)?
320 }
321 };
322
323 Ok(PipelineExecOutput::Transformed(TransformedOutput {
324 opt,
325 row,
326 table_suffix,
327 }))
328 }
329
330 pub fn processors(&self) -> &processor::Processors {
331 &self.processors
332 }
333
334 pub fn transformer(&self) -> &TransformerMode {
335 &self.transformer
336 }
337
338 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
340 match &self.transformer {
341 TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
342 TransformerMode::AutoTransform(_, _) => None,
343 }
344 }
345}
346
347pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
348 intermediate_keys
349 .iter()
350 .position(|k| k == key)
351 .context(IntermediateKeyIndexSnafu { kind, key })
352}
353
354#[macro_export]
363macro_rules! setup_pipeline {
364 ($pipeline:expr) => {{
365 use std::sync::Arc;
366
367 use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
368
369 let pipeline: Arc<Pipeline> = Arc::new($pipeline);
370 let schema = pipeline.schemas().unwrap();
371 let schema_info = SchemaInfo::from_schema_list(schema.clone());
372
373 let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
374 let pipeline_param = GreptimePipelineParams::default();
375
376 (pipeline, schema_info, pipeline_def, pipeline_param)
377 }};
378}
379#[cfg(test)]
380mod tests {
381 use std::collections::BTreeMap;
382 use std::sync::Arc;
383
384 use api::v1::Rows;
385 use greptime_proto::v1::value::ValueData;
386 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
387 use vrl::prelude::Bytes;
388 use vrl::value::KeyString;
389
390 use super::*;
391
392 #[test]
393 fn test_pipeline_prepare() {
394 let input_value_str = r#"
395 {
396 "my_field": "1,2",
397 "foo": "bar",
398 "ts": "1"
399 }
400 "#;
401 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
402
403 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
404processors:
405 - csv:
406 field: my_field
407 target_fields: field1, field2
408 - epoch:
409 field: ts
410 resolution: ns
411transform:
412 - field: field1
413 type: uint32
414 - field: field2
415 type: uint32
416 - field: ts
417 type: timestamp, ns
418 index: time
419 "#;
420
421 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
422 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
423 let pipeline_ctx = PipelineContext::new(
424 &pipeline_def,
425 &pipeline_param,
426 session::context::Channel::Unknown,
427 );
428
429 let payload = input_value.into();
430 let result = pipeline
431 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
432 .unwrap()
433 .into_transformed()
434 .unwrap();
435
436 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
437 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
438 match &result.0.values[2].value_data {
439 Some(ValueData::TimestampNanosecondValue(v)) => {
440 assert_ne!(v, &0);
441 }
442 _ => panic!("expect null value"),
443 }
444 }
445
446 #[test]
447 fn test_dissect_pipeline() {
448 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
449 let pipeline_str = r#"processors:
450 - dissect:
451 fields:
452 - message
453 patterns:
454 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
455 - date:
456 fields:
457 - ts
458 formats:
459 - "%d/%b/%Y:%H:%M:%S %z"
460
461transform:
462 - fields:
463 - ip
464 - username
465 - method
466 - path
467 - proto
468 type: string
469 - fields:
470 - status
471 type: uint16
472 - fields:
473 - bytes
474 type: uint32
475 - field: ts
476 type: timestamp, ns
477 index: time"#;
478 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
479 let pipeline = Arc::new(pipeline);
480 let schema = pipeline.schemas().unwrap();
481 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
482
483 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
484 let pipeline_param = crate::GreptimePipelineParams::default();
485 let pipeline_ctx = PipelineContext::new(
486 &pipeline_def,
487 &pipeline_param,
488 session::context::Channel::Unknown,
489 );
490 let payload = VrlValue::Object(BTreeMap::from([(
491 KeyString::from("message"),
492 VrlValue::Bytes(Bytes::from(message)),
493 )]));
494
495 let result = pipeline
496 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
497 .unwrap()
498 .into_transformed()
499 .unwrap();
500
501 assert_eq!(schema_info.schema.len(), result.0.values.len());
502 let test = vec![
503 (
504 ColumnDataType::String as i32,
505 Some(ValueData::StringValue("129.37.245.88".into())),
506 ),
507 (
508 ColumnDataType::String as i32,
509 Some(ValueData::StringValue("meln1ks".into())),
510 ),
511 (
512 ColumnDataType::String as i32,
513 Some(ValueData::StringValue("PATCH".into())),
514 ),
515 (
516 ColumnDataType::String as i32,
517 Some(ValueData::StringValue(
518 "/observability/metrics/production".into(),
519 )),
520 ),
521 (
522 ColumnDataType::String as i32,
523 Some(ValueData::StringValue("HTTP/1.0".into())),
524 ),
525 (
526 ColumnDataType::Uint16 as i32,
527 Some(ValueData::U16Value(501)),
528 ),
529 (
530 ColumnDataType::Uint32 as i32,
531 Some(ValueData::U32Value(33085)),
532 ),
533 (
534 ColumnDataType::TimestampNanosecond as i32,
535 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
536 ),
537 ];
538 let schema = pipeline.schemas().unwrap();
540 for i in 0..schema.len() {
541 let schema = &schema[i];
542 let value = &result.0.values[i];
543 assert_eq!(schema.datatype, test[i].0);
544 assert_eq!(value.value_data, test[i].1);
545 }
546 }
547
548 #[test]
549 fn test_csv_pipeline() {
550 let input_value_str = r#"
551 {
552 "my_field": "1,2",
553 "foo": "bar",
554 "ts": "1"
555 }
556 "#;
557 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
558
559 let pipeline_yaml = r#"
560 description: Pipeline for Apache Tomcat
561 processors:
562 - csv:
563 field: my_field
564 target_fields: field1, field2
565 - epoch:
566 field: ts
567 resolution: ns
568 transform:
569 - field: field1
570 type: uint32
571 - field: field2
572 type: uint32
573 - field: ts
574 type: timestamp, ns
575 index: time
576 "#;
577
578 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
579 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
580 let pipeline_ctx = PipelineContext::new(
581 &pipeline_def,
582 &pipeline_param,
583 session::context::Channel::Unknown,
584 );
585
586 let payload = input_value.into();
587 let result = pipeline
588 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
589 .unwrap()
590 .into_transformed()
591 .unwrap();
592 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
593 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
594 match &result.0.values[2].value_data {
595 Some(ValueData::TimestampNanosecondValue(v)) => {
596 assert_ne!(v, &0);
597 }
598 _ => panic!("expect null value"),
599 }
600 }
601
602 #[test]
603 fn test_date_pipeline() {
604 let input_value_str = r#"
605 {
606 "my_field": "1,2",
607 "foo": "bar",
608 "test_time": "2014-5-17T04:34:56+00:00"
609 }
610 "#;
611 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
612
613 let pipeline_yaml = r#"---
614description: Pipeline for Apache Tomcat
615
616processors:
617 - date:
618 field: test_time
619
620transform:
621 - field: test_time
622 type: timestamp, ns
623 index: time
624 "#;
625
626 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
627 let pipeline = Arc::new(pipeline);
628 let schema = pipeline.schemas().unwrap();
629 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
630
631 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
632 let pipeline_param = crate::GreptimePipelineParams::default();
633 let pipeline_ctx = PipelineContext::new(
634 &pipeline_def,
635 &pipeline_param,
636 session::context::Channel::Unknown,
637 );
638 let schema = pipeline.schemas().unwrap().clone();
639 let result = input_value.into();
640
641 let row = pipeline
642 .exec_mut(result, &pipeline_ctx, &mut schema_info)
643 .unwrap()
644 .into_transformed()
645 .unwrap();
646 let output = Rows {
647 schema,
648 rows: vec![row.0],
649 };
650 let schemas = output.schema;
651
652 assert_eq!(schemas.len(), 1);
653 let schema = schemas[0].clone();
654 assert_eq!("test_time", schema.column_name);
655 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
656 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
657
658 let row = output.rows[0].clone();
659 assert_eq!(1, row.values.len());
660 let value_data = row.values[0].clone().value_data;
661 assert_eq!(
662 Some(v1::value::ValueData::TimestampNanosecondValue(
663 1400301296000000000
664 )),
665 value_data
666 );
667 }
668
669 #[test]
670 fn test_dispatcher() {
671 let pipeline_yaml = r#"
672---
673description: Pipeline for Apache Tomcat
674
675processors:
676 - epoch:
677 field: ts
678 resolution: ns
679
680dispatcher:
681 field: typename
682 rules:
683 - value: http
684 table_suffix: http_events
685 - value: database
686 table_suffix: db_events
687 pipeline: database_pipeline
688
689transform:
690 - field: typename
691 type: string
692 - field: ts
693 type: timestamp, ns
694 index: time
695"#;
696 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
697 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
698 assert_eq!(dispatcher.field, "typename");
699
700 assert_eq!(dispatcher.rules.len(), 2);
701
702 assert_eq!(
703 dispatcher.rules[0],
704 crate::dispatcher::Rule {
705 value: VrlValue::Bytes(Bytes::from("http")),
706 table_suffix: "http_events".to_string(),
707 pipeline: None
708 }
709 );
710
711 assert_eq!(
712 dispatcher.rules[1],
713 crate::dispatcher::Rule {
714 value: VrlValue::Bytes(Bytes::from("database")),
715 table_suffix: "db_events".to_string(),
716 pipeline: Some("database_pipeline".to_string()),
717 }
718 );
719
720 let bad_yaml1 = r#"
721---
722description: Pipeline for Apache Tomcat
723
724processors:
725 - epoch:
726 field: ts
727 resolution: ns
728
729dispatcher:
730 _field: typename
731 rules:
732 - value: http
733 table_suffix: http_events
734 - value: database
735 table_suffix: db_events
736 pipeline: database_pipeline
737
738transform:
739 - field: typename
740 type: string
741 - field: ts
742 type: timestamp, ns
743 index: time
744"#;
745 let bad_yaml2 = r#"
746---
747description: Pipeline for Apache Tomcat
748
749processors:
750 - epoch:
751 field: ts
752 resolution: ns
753dispatcher:
754 field: typename
755 rules:
756 - value: http
757 _table_suffix: http_events
758 - value: database
759 _table_suffix: db_events
760 pipeline: database_pipeline
761
762transform:
763 - field: typename
764 type: string
765 - field: ts
766 type: timestamp, ns
767 index: time
768"#;
769 let bad_yaml3 = r#"
770---
771description: Pipeline for Apache Tomcat
772
773processors:
774 - epoch:
775 field: ts
776 resolution: ns
777dispatcher:
778 field: typename
779 rules:
780 - _value: http
781 table_suffix: http_events
782 - _value: database
783 table_suffix: db_events
784 pipeline: database_pipeline
785
786transform:
787 - field: typename
788 type: string
789 - field: ts
790 type: timestamp, ns
791 index: time
792"#;
793
794 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
795 assert!(r.is_err());
796 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
797 assert!(r.is_err());
798 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
799 assert!(r.is_err());
800 }
801}