pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use ahash::{HashMap, HashMapExt};
23use api::v1::Row;
24use common_time::timestamp::TimeUnit;
25use processor::{Processor, Processors};
26use snafu::{ensure, OptionExt, ResultExt};
27use transform::Transforms;
28use value::Value;
29use yaml_rust::YamlLoader;
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33    AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
34    YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
37use crate::etl::processor::ProcessorKind;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer};
40
41const DESCRIPTION: &str = "description";
42const PROCESSORS: &str = "processors";
43const TRANSFORM: &str = "transform";
44const TRANSFORMS: &str = "transforms";
45const DISPATCHER: &str = "dispatcher";
46const TABLESUFFIX: &str = "table_suffix";
47
48pub type PipelineMap = std::collections::BTreeMap<String, Value>;
49
50pub enum Content<'a> {
51    Json(&'a str),
52    Yaml(&'a str),
53}
54
55pub fn parse(input: &Content) -> Result<Pipeline> {
56    match input {
57        Content::Yaml(str) => {
58            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
59
60            ensure!(docs.len() == 1, YamlParseSnafu);
61
62            let doc = &docs[0];
63
64            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
65
66            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
67                v.try_into()?
68            } else {
69                Processors::default()
70            };
71
72            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
73            {
74                v.try_into()?
75            } else {
76                Transforms::default()
77            };
78
79            let transformer = if transformers.is_empty() {
80                // use auto transform
81                // check processors have at least one timestamp-related processor
82                let cnt = processors
83                    .iter()
84                    .filter_map(|p| match p {
85                        ProcessorKind::Date(d) => Some(d.target_count()),
86                        ProcessorKind::Timestamp(t) => Some(t.target_count()),
87                        ProcessorKind::Epoch(e) => Some(e.target_count()),
88                        _ => None,
89                    })
90                    .sum::<usize>();
91                ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
92                None
93            } else {
94                Some(GreptimeTransformer::new(transformers)?)
95            };
96
97            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
98                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
99            } else {
100                None
101            };
102
103            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
104                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
105            } else {
106                None
107            };
108
109            Ok(Pipeline {
110                description,
111                processors,
112                transformer,
113                dispatcher,
114                tablesuffix,
115            })
116        }
117        Content::Json(_) => unimplemented!(),
118    }
119}
120
121#[derive(Debug)]
122pub struct Pipeline {
123    description: Option<String>,
124    processors: processor::Processors,
125    dispatcher: Option<Dispatcher>,
126    transformer: Option<GreptimeTransformer>,
127    tablesuffix: Option<TableSuffixTemplate>,
128}
129
130/// Where the pipeline executed is dispatched to, with context information
131#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
132pub struct DispatchedTo {
133    pub table_suffix: String,
134    pub pipeline: Option<String>,
135}
136
137impl From<&Rule> for DispatchedTo {
138    fn from(value: &Rule) -> Self {
139        DispatchedTo {
140            table_suffix: value.table_suffix.clone(),
141            pipeline: value.pipeline.clone(),
142        }
143    }
144}
145
146impl DispatchedTo {
147    /// Generate destination table name from input
148    pub fn dispatched_to_table_name(&self, original: &str) -> String {
149        format!("{}_{}", &original, self.table_suffix)
150    }
151}
152
153/// The result of pipeline execution
154#[derive(Debug)]
155pub enum PipelineExecOutput {
156    Transformed(TransformedOutput),
157    AutoTransform(AutoTransformOutput),
158    DispatchedTo(DispatchedTo, PipelineMap),
159}
160
161#[derive(Debug)]
162pub struct TransformedOutput {
163    pub opt: ContextOpt,
164    pub row: Row,
165    pub table_suffix: Option<String>,
166    pub pipeline_map: PipelineMap,
167}
168
169#[derive(Debug)]
170pub struct AutoTransformOutput {
171    pub table_suffix: Option<String>,
172    // ts_column_name -> unit
173    pub ts_unit_map: HashMap<String, TimeUnit>,
174    pub pipeline_map: PipelineMap,
175}
176
177impl PipelineExecOutput {
178    // Note: This is a test only function, do not use it in production.
179    pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
180        if let Self::Transformed(TransformedOutput {
181            row, table_suffix, ..
182        }) = self
183        {
184            Some((row, table_suffix))
185        } else {
186            None
187        }
188    }
189
190    // Note: This is a test only function, do not use it in production.
191    pub fn into_dispatched(self) -> Option<DispatchedTo> {
192        if let Self::DispatchedTo(d, _) = self {
193            Some(d)
194        } else {
195            None
196        }
197    }
198}
199
200pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
201    match val {
202        serde_json::Value::Object(map) => {
203            let mut intermediate_state = PipelineMap::new();
204            for (k, v) in map {
205                intermediate_state.insert(k, Value::try_from(v)?);
206            }
207            Ok(intermediate_state)
208        }
209        _ => InputValueMustBeObjectSnafu.fail(),
210    }
211}
212
213pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
214    val.into_iter().map(json_to_map).collect()
215}
216
217pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
218    match val {
219        simd_json::OwnedValue::Object(map) => {
220            let mut intermediate_state = PipelineMap::new();
221            for (k, v) in map.into_iter() {
222                intermediate_state.insert(k, Value::try_from(v)?);
223            }
224            Ok(intermediate_state)
225        }
226        _ => InputValueMustBeObjectSnafu.fail(),
227    }
228}
229
230pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
231    val.into_iter().map(simd_json_to_map).collect()
232}
233
234impl Pipeline {
235    pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
236        // process
237        for processor in self.processors.iter() {
238            val = processor.exec_mut(val)?;
239        }
240
241        // dispatch, fast return if matched
242        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
243            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
244        }
245
246        // do transform
247        if let Some(transformer) = self.transformer() {
248            let (mut opt, row) = transformer.transform_mut(&mut val)?;
249            let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
250
251            Ok(PipelineExecOutput::Transformed(TransformedOutput {
252                opt,
253                row,
254                table_suffix,
255                pipeline_map: val,
256            }))
257        } else {
258            // check table suffix var
259            let table_suffix = val
260                .remove(TABLE_SUFFIX_KEY)
261                .map(|f| f.to_str_value())
262                .or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
263
264            let mut ts_unit_map = HashMap::with_capacity(4);
265            // get all ts values
266            for (k, v) in val.iter() {
267                if let Value::Timestamp(ts) = v {
268                    if !ts_unit_map.contains_key(k) {
269                        ts_unit_map.insert(k.clone(), ts.get_unit());
270                    }
271                }
272            }
273            Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
274                table_suffix,
275                ts_unit_map,
276                pipeline_map: val,
277            }))
278        }
279    }
280
281    pub fn processors(&self) -> &processor::Processors {
282        &self.processors
283    }
284
285    pub fn transformer(&self) -> Option<&GreptimeTransformer> {
286        self.transformer.as_ref()
287    }
288
289    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
290        self.transformer.as_ref().map(|t| t.schemas())
291    }
292}
293
294pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
295    intermediate_keys
296        .iter()
297        .position(|k| k == key)
298        .context(IntermediateKeyIndexSnafu { kind, key })
299}
300
301#[cfg(test)]
302mod tests {
303    use api::v1::Rows;
304    use greptime_proto::v1::value::ValueData;
305    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
306
307    use super::*;
308
309    #[test]
310    fn test_pipeline_prepare() {
311        let input_value_str = r#"
312                    {
313                        "my_field": "1,2",
314                        "foo": "bar"
315                    }
316                "#;
317        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
318
319        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
320processors:
321    - csv:
322        field: my_field
323        target_fields: field1, field2
324transform:
325    - field: field1
326      type: uint32
327    - field: field2
328      type: uint32
329    "#;
330        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
331        let payload = json_to_map(input_value).unwrap();
332        let result = pipeline
333            .exec_mut(payload)
334            .unwrap()
335            .into_transformed()
336            .unwrap();
337
338        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
339        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
340        match &result.0.values[2].value_data {
341            Some(ValueData::TimestampNanosecondValue(v)) => {
342                assert_ne!(*v, 0);
343            }
344            _ => panic!("expect null value"),
345        }
346    }
347
348    #[test]
349    fn test_dissect_pipeline() {
350        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
351        let pipeline_str = r#"processors:
352    - dissect:
353        fields:
354          - message
355        patterns:
356          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
357    - timestamp:
358        fields:
359          - ts
360        formats:
361          - "%d/%b/%Y:%H:%M:%S %z"
362
363transform:
364    - fields:
365        - ip
366        - username
367        - method
368        - path
369        - proto
370      type: string
371    - fields:
372        - status
373      type: uint16
374    - fields:
375        - bytes
376      type: uint32
377    - field: ts
378      type: timestamp, ns
379      index: time"#;
380        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
381        let mut payload = PipelineMap::new();
382        payload.insert("message".to_string(), Value::String(message));
383        let result = pipeline
384            .exec_mut(payload)
385            .unwrap()
386            .into_transformed()
387            .unwrap();
388        let sechema = pipeline.schemas().unwrap();
389
390        assert_eq!(sechema.len(), result.0.values.len());
391        let test = vec![
392            (
393                ColumnDataType::String as i32,
394                Some(ValueData::StringValue("129.37.245.88".into())),
395            ),
396            (
397                ColumnDataType::String as i32,
398                Some(ValueData::StringValue("meln1ks".into())),
399            ),
400            (
401                ColumnDataType::String as i32,
402                Some(ValueData::StringValue("PATCH".into())),
403            ),
404            (
405                ColumnDataType::String as i32,
406                Some(ValueData::StringValue(
407                    "/observability/metrics/production".into(),
408                )),
409            ),
410            (
411                ColumnDataType::String as i32,
412                Some(ValueData::StringValue("HTTP/1.0".into())),
413            ),
414            (
415                ColumnDataType::Uint16 as i32,
416                Some(ValueData::U16Value(501)),
417            ),
418            (
419                ColumnDataType::Uint32 as i32,
420                Some(ValueData::U32Value(33085)),
421            ),
422            (
423                ColumnDataType::TimestampNanosecond as i32,
424                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
425            ),
426        ];
427        for i in 0..sechema.len() {
428            let schema = &sechema[i];
429            let value = &result.0.values[i];
430            assert_eq!(schema.datatype, test[i].0);
431            assert_eq!(value.value_data, test[i].1);
432        }
433    }
434
435    #[test]
436    fn test_csv_pipeline() {
437        let input_value_str = r#"
438                    {
439                        "my_field": "1,2",
440                        "foo": "bar"
441                    }
442                "#;
443        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
444
445        let pipeline_yaml = r#"
446    description: Pipeline for Apache Tomcat
447    processors:
448      - csv:
449          field: my_field
450          target_fields: field1, field2
451    transform:
452      - field: field1
453        type: uint32
454      - field: field2
455        type: uint32
456    "#;
457
458        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
459        let payload = json_to_map(input_value).unwrap();
460        let result = pipeline
461            .exec_mut(payload)
462            .unwrap()
463            .into_transformed()
464            .unwrap();
465        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
466        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
467        match &result.0.values[2].value_data {
468            Some(ValueData::TimestampNanosecondValue(v)) => {
469                assert_ne!(*v, 0);
470            }
471            _ => panic!("expect null value"),
472        }
473    }
474
475    #[test]
476    fn test_date_pipeline() {
477        let input_value_str = r#"
478                {
479                    "my_field": "1,2",
480                    "foo": "bar",
481                    "test_time": "2014-5-17T04:34:56+00:00"
482                }
483            "#;
484        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
485
486        let pipeline_yaml = r#"---
487description: Pipeline for Apache Tomcat
488
489processors:
490    - timestamp:
491        field: test_time
492
493transform:
494    - field: test_time
495      type: timestamp, ns
496      index: time
497    "#;
498
499        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
500        let schema = pipeline.schemas().unwrap().clone();
501        let result = json_to_map(input_value).unwrap();
502
503        let row = pipeline
504            .exec_mut(result)
505            .unwrap()
506            .into_transformed()
507            .unwrap();
508        let output = Rows {
509            schema,
510            rows: vec![row.0],
511        };
512        let schemas = output.schema;
513
514        assert_eq!(schemas.len(), 1);
515        let schema = schemas[0].clone();
516        assert_eq!("test_time", schema.column_name);
517        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
518        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
519
520        let row = output.rows[0].clone();
521        assert_eq!(1, row.values.len());
522        let value_data = row.values[0].clone().value_data;
523        assert_eq!(
524            Some(v1::value::ValueData::TimestampNanosecondValue(
525                1400301296000000000
526            )),
527            value_data
528        );
529    }
530
531    #[test]
532    fn test_dispatcher() {
533        let pipeline_yaml = r#"
534---
535description: Pipeline for Apache Tomcat
536
537processors:
538
539dispatcher:
540  field: typename
541  rules:
542    - value: http
543      table_suffix: http_events
544    - value: database
545      table_suffix: db_events
546      pipeline: database_pipeline
547
548transform:
549  - field: typename
550    type: string
551
552"#;
553        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
554        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
555        assert_eq!(dispatcher.field, "typename");
556
557        assert_eq!(dispatcher.rules.len(), 2);
558
559        assert_eq!(
560            dispatcher.rules[0],
561            crate::dispatcher::Rule {
562                value: Value::String("http".to_string()),
563                table_suffix: "http_events".to_string(),
564                pipeline: None
565            }
566        );
567
568        assert_eq!(
569            dispatcher.rules[1],
570            crate::dispatcher::Rule {
571                value: Value::String("database".to_string()),
572                table_suffix: "db_events".to_string(),
573                pipeline: Some("database_pipeline".to_string()),
574            }
575        );
576
577        let bad_yaml1 = r#"
578---
579description: Pipeline for Apache Tomcat
580
581processors:
582
583dispatcher:
584  _field: typename
585  rules:
586    - value: http
587      table_suffix: http_events
588    - value: database
589      table_suffix: db_events
590      pipeline: database_pipeline
591
592transform:
593  - field: typename
594    type: string
595
596"#;
597        let bad_yaml2 = r#"
598---
599description: Pipeline for Apache Tomcat
600
601processors:
602
603dispatcher:
604  field: typename
605  rules:
606    - value: http
607      _table_suffix: http_events
608    - value: database
609      _table_suffix: db_events
610      pipeline: database_pipeline
611
612transform:
613  - field: typename
614    type: string
615
616"#;
617        let bad_yaml3 = r#"
618---
619description: Pipeline for Apache Tomcat
620
621processors:
622
623dispatcher:
624  field: typename
625  rules:
626    - _value: http
627      table_suffix: http_events
628    - _value: database
629      table_suffix: db_events
630      pipeline: database_pipeline
631
632transform:
633  - field: typename
634    type: string
635
636"#;
637
638        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
639        assert!(r.is_err());
640        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
641        assert!(r.is_err());
642        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
643        assert!(r.is_err());
644    }
645}