pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod field;
17pub mod processor;
18pub mod transform;
19pub mod value;
20
21use ahash::{HashMap, HashMapExt};
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use processor::{Processor, Processors};
25use snafu::{ensure, OptionExt, ResultExt};
26use transform::Transforms;
27use value::Value;
28use yaml_rust::YamlLoader;
29
30use crate::dispatcher::{Dispatcher, Rule};
31use crate::error::{
32    IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result,
33    TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
34};
35use crate::etl::processor::ProcessorKind;
36use crate::tablesuffix::TableSuffixTemplate;
37use crate::GreptimeTransformer;
38
39const DESCRIPTION: &str = "description";
40const PROCESSORS: &str = "processors";
41const TRANSFORM: &str = "transform";
42const TRANSFORMS: &str = "transforms";
43const DISPATCHER: &str = "dispatcher";
44const TABLESUFFIX: &str = "table_suffix";
45
46pub type PipelineMap = std::collections::BTreeMap<String, Value>;
47
48pub enum Content<'a> {
49    Json(&'a str),
50    Yaml(&'a str),
51}
52
53pub fn parse(input: &Content) -> Result<Pipeline> {
54    match input {
55        Content::Yaml(str) => {
56            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
57
58            ensure!(docs.len() == 1, YamlParseSnafu);
59
60            let doc = &docs[0];
61
62            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
63
64            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
65                v.try_into()?
66            } else {
67                Processors::default()
68            };
69
70            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
71            {
72                v.try_into()?
73            } else {
74                Transforms::default()
75            };
76
77            let transformer = if transformers.is_empty() {
78                // use auto transform
79                // check processors have at least one timestamp-related processor
80                let cnt = processors
81                    .iter()
82                    .filter(|p| {
83                        matches!(
84                            p,
85                            ProcessorKind::Date(_)
86                                | ProcessorKind::Timestamp(_)
87                                | ProcessorKind::Epoch(_)
88                        )
89                    })
90                    .count();
91                ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
92                None
93            } else {
94                Some(GreptimeTransformer::new(transformers)?)
95            };
96
97            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
98                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
99            } else {
100                None
101            };
102
103            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
104                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
105            } else {
106                None
107            };
108
109            Ok(Pipeline {
110                description,
111                processors,
112                transformer,
113                dispatcher,
114                tablesuffix,
115            })
116        }
117        Content::Json(_) => unimplemented!(),
118    }
119}
120
121#[derive(Debug)]
122pub struct Pipeline {
123    description: Option<String>,
124    processors: processor::Processors,
125    dispatcher: Option<Dispatcher>,
126    transformer: Option<GreptimeTransformer>,
127    tablesuffix: Option<TableSuffixTemplate>,
128}
129
130/// Where the pipeline executed is dispatched to, with context information
131#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
132pub struct DispatchedTo {
133    pub table_suffix: String,
134    pub pipeline: Option<String>,
135}
136
137impl From<&Rule> for DispatchedTo {
138    fn from(value: &Rule) -> Self {
139        DispatchedTo {
140            table_suffix: value.table_suffix.clone(),
141            pipeline: value.pipeline.clone(),
142        }
143    }
144}
145
146impl DispatchedTo {
147    /// Generate destination table name from input
148    pub fn dispatched_to_table_name(&self, original: &str) -> String {
149        format!("{}_{}", &original, self.table_suffix)
150    }
151}
152
153/// The result of pipeline execution
154#[derive(Debug)]
155pub enum PipelineExecOutput {
156    Transformed((Row, Option<String>)),
157    // table_suffix, ts_key -> unit
158    AutoTransform(Option<String>, HashMap<String, TimeUnit>),
159    DispatchedTo(DispatchedTo),
160}
161
162impl PipelineExecOutput {
163    pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
164        if let Self::Transformed(o) = self {
165            Some(o)
166        } else {
167            None
168        }
169    }
170
171    pub fn into_dispatched(self) -> Option<DispatchedTo> {
172        if let Self::DispatchedTo(d) = self {
173            Some(d)
174        } else {
175            None
176        }
177    }
178}
179
180pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
181    match val {
182        serde_json::Value::Object(map) => {
183            let mut intermediate_state = PipelineMap::new();
184            for (k, v) in map {
185                intermediate_state.insert(k, Value::try_from(v)?);
186            }
187            Ok(intermediate_state)
188        }
189        _ => PrepareValueMustBeObjectSnafu.fail(),
190    }
191}
192
193pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
194    val.into_iter().map(json_to_map).collect()
195}
196
197pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
198    match val {
199        simd_json::OwnedValue::Object(map) => {
200            let mut intermediate_state = PipelineMap::new();
201            for (k, v) in map.into_iter() {
202                intermediate_state.insert(k, Value::try_from(v)?);
203            }
204            Ok(intermediate_state)
205        }
206        _ => PrepareValueMustBeObjectSnafu.fail(),
207    }
208}
209
210pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
211    val.into_iter().map(simd_json_to_map).collect()
212}
213
214impl Pipeline {
215    pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
216        // process
217        for processor in self.processors.iter() {
218            processor.exec_mut(val)?;
219        }
220
221        // dispatch, fast return if matched
222        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
223            return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
224        }
225
226        if let Some(transformer) = self.transformer() {
227            let row = transformer.transform_mut(val)?;
228            let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
229            Ok(PipelineExecOutput::Transformed((row, table_suffix)))
230        } else {
231            let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
232            let mut ts_unit_map = HashMap::with_capacity(4);
233            // get all ts values
234            for (k, v) in val {
235                if let Value::Timestamp(ts) = v {
236                    if !ts_unit_map.contains_key(k) {
237                        ts_unit_map.insert(k.clone(), ts.get_unit());
238                    }
239                }
240            }
241            Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map))
242        }
243    }
244
245    pub fn processors(&self) -> &processor::Processors {
246        &self.processors
247    }
248
249    pub fn transformer(&self) -> Option<&GreptimeTransformer> {
250        self.transformer.as_ref()
251    }
252
253    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
254        self.transformer.as_ref().map(|t| t.schemas())
255    }
256}
257
258pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
259    intermediate_keys
260        .iter()
261        .position(|k| k == key)
262        .context(IntermediateKeyIndexSnafu { kind, key })
263}
264
265#[cfg(test)]
266mod tests {
267    use api::v1::Rows;
268    use greptime_proto::v1::value::ValueData;
269    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
270
271    use super::*;
272
273    #[test]
274    fn test_pipeline_prepare() {
275        let input_value_str = r#"
276                    {
277                        "my_field": "1,2",
278                        "foo": "bar"
279                    }
280                "#;
281        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
282
283        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
284processors:
285    - csv:
286        field: my_field
287        target_fields: field1, field2
288transform:
289    - field: field1
290      type: uint32
291    - field: field2
292      type: uint32
293    "#;
294        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
295        let mut payload = json_to_map(input_value).unwrap();
296        let result = pipeline
297            .exec_mut(&mut payload)
298            .unwrap()
299            .into_transformed()
300            .unwrap();
301
302        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
303        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
304        match &result.0.values[2].value_data {
305            Some(ValueData::TimestampNanosecondValue(v)) => {
306                assert_ne!(*v, 0);
307            }
308            _ => panic!("expect null value"),
309        }
310    }
311
312    #[test]
313    fn test_dissect_pipeline() {
314        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
315        let pipeline_str = r#"processors:
316    - dissect:
317        fields:
318          - message
319        patterns:
320          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
321    - timestamp:
322        fields:
323          - ts
324        formats:
325          - "%d/%b/%Y:%H:%M:%S %z"
326
327transform:
328    - fields:
329        - ip
330        - username
331        - method
332        - path
333        - proto
334      type: string
335    - fields:
336        - status
337      type: uint16
338    - fields:
339        - bytes
340      type: uint32
341    - field: ts
342      type: timestamp, ns
343      index: time"#;
344        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
345        let mut payload = PipelineMap::new();
346        payload.insert("message".to_string(), Value::String(message));
347        let result = pipeline
348            .exec_mut(&mut payload)
349            .unwrap()
350            .into_transformed()
351            .unwrap();
352        let sechema = pipeline.schemas().unwrap();
353
354        assert_eq!(sechema.len(), result.0.values.len());
355        let test = vec![
356            (
357                ColumnDataType::String as i32,
358                Some(ValueData::StringValue("129.37.245.88".into())),
359            ),
360            (
361                ColumnDataType::String as i32,
362                Some(ValueData::StringValue("meln1ks".into())),
363            ),
364            (
365                ColumnDataType::String as i32,
366                Some(ValueData::StringValue("PATCH".into())),
367            ),
368            (
369                ColumnDataType::String as i32,
370                Some(ValueData::StringValue(
371                    "/observability/metrics/production".into(),
372                )),
373            ),
374            (
375                ColumnDataType::String as i32,
376                Some(ValueData::StringValue("HTTP/1.0".into())),
377            ),
378            (
379                ColumnDataType::Uint16 as i32,
380                Some(ValueData::U16Value(501)),
381            ),
382            (
383                ColumnDataType::Uint32 as i32,
384                Some(ValueData::U32Value(33085)),
385            ),
386            (
387                ColumnDataType::TimestampNanosecond as i32,
388                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
389            ),
390        ];
391        for i in 0..sechema.len() {
392            let schema = &sechema[i];
393            let value = &result.0.values[i];
394            assert_eq!(schema.datatype, test[i].0);
395            assert_eq!(value.value_data, test[i].1);
396        }
397    }
398
399    #[test]
400    fn test_csv_pipeline() {
401        let input_value_str = r#"
402                    {
403                        "my_field": "1,2",
404                        "foo": "bar"
405                    }
406                "#;
407        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
408
409        let pipeline_yaml = r#"
410    description: Pipeline for Apache Tomcat
411    processors:
412      - csv:
413          field: my_field
414          target_fields: field1, field2
415    transform:
416      - field: field1
417        type: uint32
418      - field: field2
419        type: uint32
420    "#;
421
422        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
423        let mut payload = json_to_map(input_value).unwrap();
424        let result = pipeline
425            .exec_mut(&mut payload)
426            .unwrap()
427            .into_transformed()
428            .unwrap();
429        assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
430        assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
431        match &result.0.values[2].value_data {
432            Some(ValueData::TimestampNanosecondValue(v)) => {
433                assert_ne!(*v, 0);
434            }
435            _ => panic!("expect null value"),
436        }
437    }
438
439    #[test]
440    fn test_date_pipeline() {
441        let input_value_str = r#"
442                {
443                    "my_field": "1,2",
444                    "foo": "bar",
445                    "test_time": "2014-5-17T04:34:56+00:00"
446                }
447            "#;
448        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
449
450        let pipeline_yaml = r#"---
451description: Pipeline for Apache Tomcat
452
453processors:
454    - timestamp:
455        field: test_time
456
457transform:
458    - field: test_time
459      type: timestamp, ns
460      index: time
461    "#;
462
463        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
464        let schema = pipeline.schemas().unwrap().clone();
465        let mut result = json_to_map(input_value).unwrap();
466
467        let row = pipeline
468            .exec_mut(&mut result)
469            .unwrap()
470            .into_transformed()
471            .unwrap();
472        let output = Rows {
473            schema,
474            rows: vec![row.0],
475        };
476        let schemas = output.schema;
477
478        assert_eq!(schemas.len(), 1);
479        let schema = schemas[0].clone();
480        assert_eq!("test_time", schema.column_name);
481        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
482        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
483
484        let row = output.rows[0].clone();
485        assert_eq!(1, row.values.len());
486        let value_data = row.values[0].clone().value_data;
487        assert_eq!(
488            Some(v1::value::ValueData::TimestampNanosecondValue(
489                1400301296000000000
490            )),
491            value_data
492        );
493    }
494
495    #[test]
496    fn test_dispatcher() {
497        let pipeline_yaml = r#"
498---
499description: Pipeline for Apache Tomcat
500
501processors:
502
503dispatcher:
504  field: typename
505  rules:
506    - value: http
507      table_suffix: http_events
508    - value: database
509      table_suffix: db_events
510      pipeline: database_pipeline
511
512transform:
513  - field: typename
514    type: string
515
516"#;
517        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
518        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
519        assert_eq!(dispatcher.field, "typename");
520
521        assert_eq!(dispatcher.rules.len(), 2);
522
523        assert_eq!(
524            dispatcher.rules[0],
525            crate::dispatcher::Rule {
526                value: Value::String("http".to_string()),
527                table_suffix: "http_events".to_string(),
528                pipeline: None
529            }
530        );
531
532        assert_eq!(
533            dispatcher.rules[1],
534            crate::dispatcher::Rule {
535                value: Value::String("database".to_string()),
536                table_suffix: "db_events".to_string(),
537                pipeline: Some("database_pipeline".to_string()),
538            }
539        );
540
541        let bad_yaml1 = r#"
542---
543description: Pipeline for Apache Tomcat
544
545processors:
546
547dispatcher:
548  _field: typename
549  rules:
550    - value: http
551      table_suffix: http_events
552    - value: database
553      table_suffix: db_events
554      pipeline: database_pipeline
555
556transform:
557  - field: typename
558    type: string
559
560"#;
561        let bad_yaml2 = r#"
562---
563description: Pipeline for Apache Tomcat
564
565processors:
566
567dispatcher:
568  field: typename
569  rules:
570    - value: http
571      _table_suffix: http_events
572    - value: database
573      _table_suffix: db_events
574      pipeline: database_pipeline
575
576transform:
577  - field: typename
578    type: string
579
580"#;
581        let bad_yaml3 = r#"
582---
583description: Pipeline for Apache Tomcat
584
585processors:
586
587dispatcher:
588  field: typename
589  rules:
590    - _value: http
591      table_suffix: http_events
592    - _value: database
593      table_suffix: db_events
594      pipeline: database_pipeline
595
596transform:
597  - field: typename
598    type: string
599
600"#;
601
602        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
603        assert!(r.is_err());
604        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
605        assert!(r.is_err());
606        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
607        assert!(r.is_err());
608    }
609}