1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::BTreeMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{ensure, OptionExt, ResultExt};
29use transform::Transforms;
30use value::Value;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35 AutoTransformOneTimestampSnafu, Error, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu,
36 InvalidVersionNumberSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
37};
38use crate::etl::processor::ProcessorKind;
39use crate::etl::transform::transformer::greptime::values_to_row;
40use crate::tablesuffix::TableSuffixTemplate;
41use crate::{ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo};
42
43const DESCRIPTION: &str = "description";
44const DOC_VERSION: &str = "version";
45const PROCESSORS: &str = "processors";
46const TRANSFORM: &str = "transform";
47const TRANSFORMS: &str = "transforms";
48const DISPATCHER: &str = "dispatcher";
49const TABLESUFFIX: &str = "table_suffix";
50
51pub enum Content<'a> {
52 Json(&'a str),
53 Yaml(&'a str),
54}
55
56pub fn parse(input: &Content) -> Result<Pipeline> {
57 match input {
58 Content::Yaml(str) => {
59 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
60
61 ensure!(docs.len() == 1, YamlParseSnafu);
62
63 let doc = &docs[0];
64
65 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
66
67 let doc_version = (&doc[DOC_VERSION]).try_into()?;
68
69 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
70 v.try_into()?
71 } else {
72 Processors::default()
73 };
74
75 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
76 {
77 v.try_into()?
78 } else {
79 Transforms::default()
80 };
81
82 let transformer = if transformers.is_empty() {
83 let cnt = processors
86 .iter()
87 .filter_map(|p| match p {
88 ProcessorKind::Date(d) if !d.ignore_missing() => Some(
89 d.fields
90 .iter()
91 .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
92 .collect_vec(),
93 ),
94 ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
95 e.fields
96 .iter()
97 .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
98 .collect_vec(),
99 ),
100 _ => None,
101 })
102 .flatten()
103 .collect_vec();
104 ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
105
106 let (ts_name, timeunit) = cnt.first().unwrap();
107 TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
108 } else {
109 TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
110 transformers,
111 &doc_version,
112 )?)
113 };
114
115 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
116 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
117 } else {
118 None
119 };
120
121 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
122 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
123 } else {
124 None
125 };
126
127 Ok(Pipeline {
128 doc_version,
129 description,
130 processors,
131 transformer,
132 dispatcher,
133 tablesuffix,
134 })
135 }
136 Content::Json(_) => unimplemented!(),
137 }
138}
139
140#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
141pub enum PipelineDocVersion {
142 #[default]
145 V1,
146
147 V2,
154}
155
156impl TryFrom<&Yaml> for PipelineDocVersion {
157 type Error = Error;
158
159 fn try_from(value: &Yaml) -> Result<Self> {
160 if value.is_badvalue() || value.is_null() {
161 return Ok(PipelineDocVersion::V1);
162 }
163
164 let version = match value {
165 Yaml::String(s) => s
166 .parse::<i64>()
167 .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
168 Yaml::Integer(i) => *i,
169 _ => {
170 return InvalidVersionNumberSnafu {
171 version: value.as_str().unwrap_or_default().to_string(),
172 }
173 .fail();
174 }
175 };
176
177 match version {
178 1 => Ok(PipelineDocVersion::V1),
179 2 => Ok(PipelineDocVersion::V2),
180 _ => InvalidVersionNumberSnafu {
181 version: version.to_string(),
182 }
183 .fail(),
184 }
185 }
186}
187
188#[derive(Debug)]
189pub struct Pipeline {
190 doc_version: PipelineDocVersion,
191 description: Option<String>,
192 processors: processor::Processors,
193 dispatcher: Option<Dispatcher>,
194 transformer: TransformerMode,
195 tablesuffix: Option<TableSuffixTemplate>,
196}
197
198#[derive(Debug, Clone)]
199pub enum TransformerMode {
200 GreptimeTransformer(GreptimeTransformer),
201 AutoTransform(String, TimeUnit),
202}
203
204#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
206pub struct DispatchedTo {
207 pub table_suffix: String,
208 pub pipeline: Option<String>,
209}
210
211impl From<&Rule> for DispatchedTo {
212 fn from(value: &Rule) -> Self {
213 DispatchedTo {
214 table_suffix: value.table_suffix.clone(),
215 pipeline: value.pipeline.clone(),
216 }
217 }
218}
219
220impl DispatchedTo {
221 pub fn dispatched_to_table_name(&self, original: &str) -> String {
223 format!("{}_{}", &original, self.table_suffix)
224 }
225}
226
227#[derive(Debug)]
229pub enum PipelineExecOutput {
230 Transformed(TransformedOutput),
231 DispatchedTo(DispatchedTo, Value),
232}
233
234#[derive(Debug)]
235pub struct TransformedOutput {
236 pub opt: ContextOpt,
237 pub row: Row,
238 pub table_suffix: Option<String>,
239}
240
241impl PipelineExecOutput {
242 pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
244 if let Self::Transformed(TransformedOutput {
245 row, table_suffix, ..
246 }) = self
247 {
248 Some((row, table_suffix))
249 } else {
250 None
251 }
252 }
253
254 pub fn into_dispatched(self) -> Option<DispatchedTo> {
256 if let Self::DispatchedTo(d, _) = self {
257 Some(d)
258 } else {
259 None
260 }
261 }
262}
263
264pub fn json_to_map(val: serde_json::Value) -> Result<Value> {
265 match val {
266 serde_json::Value::Object(map) => {
267 let mut intermediate_state = BTreeMap::new();
268 for (k, v) in map {
269 intermediate_state.insert(k, Value::try_from(v)?);
270 }
271 Ok(Value::Map(intermediate_state.into()))
272 }
273 _ => InputValueMustBeObjectSnafu.fail(),
274 }
275}
276
277pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<Value>> {
278 val.into_iter().map(json_to_map).collect()
279}
280
281pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<Value> {
282 match val {
283 simd_json::OwnedValue::Object(map) => {
284 let mut intermediate_state = BTreeMap::new();
285 for (k, v) in map.into_iter() {
286 intermediate_state.insert(k, Value::try_from(v)?);
287 }
288 Ok(Value::Map(intermediate_state.into()))
289 }
290 _ => InputValueMustBeObjectSnafu.fail(),
291 }
292}
293
294pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Value>> {
295 val.into_iter().map(simd_json_to_map).collect()
296}
297
298impl Pipeline {
299 fn is_v1(&self) -> bool {
300 self.doc_version == PipelineDocVersion::V1
301 }
302
303 pub fn exec_mut(
304 &self,
305 mut val: Value,
306 pipeline_ctx: &PipelineContext<'_>,
307 schema_info: &mut SchemaInfo,
308 ) -> Result<PipelineExecOutput> {
309 for processor in self.processors.iter() {
311 val = processor.exec_mut(val)?;
312 }
313
314 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
316 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
317 }
318
319 let mut opt = ContextOpt::from_pipeline_map_to_opt(&mut val)?;
322 let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
323
324 let row = match self.transformer() {
325 TransformerMode::GreptimeTransformer(greptime_transformer) => {
326 let values = greptime_transformer.transform_mut(&mut val, self.is_v1())?;
327 if self.is_v1() {
328 return Ok(PipelineExecOutput::Transformed(TransformedOutput {
331 opt,
332 row: Row { values },
333 table_suffix,
334 }));
335 }
336 values_to_row(schema_info, val, pipeline_ctx, Some(values))?
339 }
340 TransformerMode::AutoTransform(ts_name, time_unit) => {
341 let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
346 IdentityTimeIndex::Epoch(ts_name.to_string(), *time_unit, false),
347 ));
348 let n_ctx =
349 PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
350 values_to_row(schema_info, val, &n_ctx, None)?
351 }
352 };
353
354 Ok(PipelineExecOutput::Transformed(TransformedOutput {
355 opt,
356 row,
357 table_suffix,
358 }))
359 }
360
361 pub fn processors(&self) -> &processor::Processors {
362 &self.processors
363 }
364
365 pub fn transformer(&self) -> &TransformerMode {
366 &self.transformer
367 }
368
369 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
371 match &self.transformer {
372 TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
373 TransformerMode::AutoTransform(_, _) => None,
374 }
375 }
376}
377
378pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
379 intermediate_keys
380 .iter()
381 .position(|k| k == key)
382 .context(IntermediateKeyIndexSnafu { kind, key })
383}
384
385#[macro_export]
394macro_rules! setup_pipeline {
395 ($pipeline:expr) => {{
396 use std::sync::Arc;
397
398 use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
399
400 let pipeline: Arc<Pipeline> = Arc::new($pipeline);
401 let schema = pipeline.schemas().unwrap();
402 let schema_info = SchemaInfo::from_schema_list(schema.clone());
403
404 let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
405 let pipeline_param = GreptimePipelineParams::default();
406
407 (pipeline, schema_info, pipeline_def, pipeline_param)
408 }};
409}
410#[cfg(test)]
411mod tests {
412 use std::sync::Arc;
413
414 use api::v1::Rows;
415 use greptime_proto::v1::value::ValueData;
416 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
417
418 use super::*;
419
420 #[test]
421 fn test_pipeline_prepare() {
422 let input_value_str = r#"
423 {
424 "my_field": "1,2",
425 "foo": "bar",
426 "ts": "1"
427 }
428 "#;
429 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
430
431 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
432processors:
433 - csv:
434 field: my_field
435 target_fields: field1, field2
436 - epoch:
437 field: ts
438 resolution: ns
439transform:
440 - field: field1
441 type: uint32
442 - field: field2
443 type: uint32
444 - field: ts
445 type: timestamp, ns
446 index: time
447 "#;
448
449 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
450 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
451 let pipeline_ctx = PipelineContext::new(
452 &pipeline_def,
453 &pipeline_param,
454 session::context::Channel::Unknown,
455 );
456
457 let payload = json_to_map(input_value).unwrap();
458 let result = pipeline
459 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
460 .unwrap()
461 .into_transformed()
462 .unwrap();
463
464 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
465 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
466 match &result.0.values[2].value_data {
467 Some(ValueData::TimestampNanosecondValue(v)) => {
468 assert_ne!(v, &0);
469 }
470 _ => panic!("expect null value"),
471 }
472 }
473
474 #[test]
475 fn test_dissect_pipeline() {
476 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
477 let pipeline_str = r#"processors:
478 - dissect:
479 fields:
480 - message
481 patterns:
482 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
483 - date:
484 fields:
485 - ts
486 formats:
487 - "%d/%b/%Y:%H:%M:%S %z"
488
489transform:
490 - fields:
491 - ip
492 - username
493 - method
494 - path
495 - proto
496 type: string
497 - fields:
498 - status
499 type: uint16
500 - fields:
501 - bytes
502 type: uint32
503 - field: ts
504 type: timestamp, ns
505 index: time"#;
506 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
507 let pipeline = Arc::new(pipeline);
508 let schema = pipeline.schemas().unwrap();
509 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
510
511 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
512 let pipeline_param = crate::GreptimePipelineParams::default();
513 let pipeline_ctx = PipelineContext::new(
514 &pipeline_def,
515 &pipeline_param,
516 session::context::Channel::Unknown,
517 );
518 let mut payload = BTreeMap::new();
519 payload.insert("message".to_string(), Value::String(message));
520 let payload = Value::Map(payload.into());
521
522 let result = pipeline
523 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
524 .unwrap()
525 .into_transformed()
526 .unwrap();
527
528 assert_eq!(schema_info.schema.len(), result.0.values.len());
532 let test = vec![
533 (
534 ColumnDataType::String as i32,
535 Some(ValueData::StringValue("129.37.245.88".into())),
536 ),
537 (
538 ColumnDataType::String as i32,
539 Some(ValueData::StringValue("meln1ks".into())),
540 ),
541 (
542 ColumnDataType::String as i32,
543 Some(ValueData::StringValue("PATCH".into())),
544 ),
545 (
546 ColumnDataType::String as i32,
547 Some(ValueData::StringValue(
548 "/observability/metrics/production".into(),
549 )),
550 ),
551 (
552 ColumnDataType::String as i32,
553 Some(ValueData::StringValue("HTTP/1.0".into())),
554 ),
555 (
556 ColumnDataType::Uint16 as i32,
557 Some(ValueData::U16Value(501)),
558 ),
559 (
560 ColumnDataType::Uint32 as i32,
561 Some(ValueData::U32Value(33085)),
562 ),
563 (
564 ColumnDataType::TimestampNanosecond as i32,
565 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
566 ),
567 ];
568 let schema = pipeline.schemas().unwrap();
570 for i in 0..schema.len() {
571 let schema = &schema[i];
572 let value = &result.0.values[i];
573 assert_eq!(schema.datatype, test[i].0);
574 assert_eq!(value.value_data, test[i].1);
575 }
576 }
577
578 #[test]
579 fn test_csv_pipeline() {
580 let input_value_str = r#"
581 {
582 "my_field": "1,2",
583 "foo": "bar",
584 "ts": "1"
585 }
586 "#;
587 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
588
589 let pipeline_yaml = r#"
590 description: Pipeline for Apache Tomcat
591 processors:
592 - csv:
593 field: my_field
594 target_fields: field1, field2
595 - epoch:
596 field: ts
597 resolution: ns
598 transform:
599 - field: field1
600 type: uint32
601 - field: field2
602 type: uint32
603 - field: ts
604 type: timestamp, ns
605 index: time
606 "#;
607
608 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
609 let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
610 let pipeline_ctx = PipelineContext::new(
611 &pipeline_def,
612 &pipeline_param,
613 session::context::Channel::Unknown,
614 );
615
616 let payload = json_to_map(input_value).unwrap();
617 let result = pipeline
618 .exec_mut(payload, &pipeline_ctx, &mut schema_info)
619 .unwrap()
620 .into_transformed()
621 .unwrap();
622 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
623 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
624 match &result.0.values[2].value_data {
625 Some(ValueData::TimestampNanosecondValue(v)) => {
626 assert_ne!(v, &0);
627 }
628 _ => panic!("expect null value"),
629 }
630 }
631
632 #[test]
633 fn test_date_pipeline() {
634 let input_value_str = r#"
635 {
636 "my_field": "1,2",
637 "foo": "bar",
638 "test_time": "2014-5-17T04:34:56+00:00"
639 }
640 "#;
641 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
642
643 let pipeline_yaml = r#"---
644description: Pipeline for Apache Tomcat
645
646processors:
647 - date:
648 field: test_time
649
650transform:
651 - field: test_time
652 type: timestamp, ns
653 index: time
654 "#;
655
656 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
657 let pipeline = Arc::new(pipeline);
658 let schema = pipeline.schemas().unwrap();
659 let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
660
661 let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
662 let pipeline_param = crate::GreptimePipelineParams::default();
663 let pipeline_ctx = PipelineContext::new(
664 &pipeline_def,
665 &pipeline_param,
666 session::context::Channel::Unknown,
667 );
668 let schema = pipeline.schemas().unwrap().clone();
669 let result = json_to_map(input_value).unwrap();
670
671 let row = pipeline
672 .exec_mut(result, &pipeline_ctx, &mut schema_info)
673 .unwrap()
674 .into_transformed()
675 .unwrap();
676 let output = Rows {
677 schema,
678 rows: vec![row.0],
679 };
680 let schemas = output.schema;
681
682 assert_eq!(schemas.len(), 1);
683 let schema = schemas[0].clone();
684 assert_eq!("test_time", schema.column_name);
685 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
686 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
687
688 let row = output.rows[0].clone();
689 assert_eq!(1, row.values.len());
690 let value_data = row.values[0].clone().value_data;
691 assert_eq!(
692 Some(v1::value::ValueData::TimestampNanosecondValue(
693 1400301296000000000
694 )),
695 value_data
696 );
697 }
698
699 #[test]
700 fn test_dispatcher() {
701 let pipeline_yaml = r#"
702---
703description: Pipeline for Apache Tomcat
704
705processors:
706 - epoch:
707 field: ts
708 resolution: ns
709
710dispatcher:
711 field: typename
712 rules:
713 - value: http
714 table_suffix: http_events
715 - value: database
716 table_suffix: db_events
717 pipeline: database_pipeline
718
719transform:
720 - field: typename
721 type: string
722 - field: ts
723 type: timestamp, ns
724 index: time
725"#;
726 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
727 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
728 assert_eq!(dispatcher.field, "typename");
729
730 assert_eq!(dispatcher.rules.len(), 2);
731
732 assert_eq!(
733 dispatcher.rules[0],
734 crate::dispatcher::Rule {
735 value: Value::String("http".to_string()),
736 table_suffix: "http_events".to_string(),
737 pipeline: None
738 }
739 );
740
741 assert_eq!(
742 dispatcher.rules[1],
743 crate::dispatcher::Rule {
744 value: Value::String("database".to_string()),
745 table_suffix: "db_events".to_string(),
746 pipeline: Some("database_pipeline".to_string()),
747 }
748 );
749
750 let bad_yaml1 = r#"
751---
752description: Pipeline for Apache Tomcat
753
754processors:
755 - epoch:
756 field: ts
757 resolution: ns
758
759dispatcher:
760 _field: typename
761 rules:
762 - value: http
763 table_suffix: http_events
764 - value: database
765 table_suffix: db_events
766 pipeline: database_pipeline
767
768transform:
769 - field: typename
770 type: string
771 - field: ts
772 type: timestamp, ns
773 index: time
774"#;
775 let bad_yaml2 = r#"
776---
777description: Pipeline for Apache Tomcat
778
779processors:
780 - epoch:
781 field: ts
782 resolution: ns
783dispatcher:
784 field: typename
785 rules:
786 - value: http
787 _table_suffix: http_events
788 - value: database
789 _table_suffix: db_events
790 pipeline: database_pipeline
791
792transform:
793 - field: typename
794 type: string
795 - field: ts
796 type: timestamp, ns
797 index: time
798"#;
799 let bad_yaml3 = r#"
800---
801description: Pipeline for Apache Tomcat
802
803processors:
804 - epoch:
805 field: ts
806 resolution: ns
807dispatcher:
808 field: typename
809 rules:
810 - _value: http
811 table_suffix: http_events
812 - _value: database
813 table_suffix: db_events
814 pipeline: database_pipeline
815
816transform:
817 - field: typename
818 type: string
819 - field: ts
820 type: timestamp, ns
821 index: time
822"#;
823
824 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
825 assert!(r.is_err());
826 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
827 assert!(r.is_err());
828 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
829 assert!(r.is_err());
830 }
831}