pipeline/etl/
processor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod filter;
23pub mod gsub;
24pub mod join;
25pub mod json_parse;
26pub mod json_path;
27pub mod letter;
28pub mod regex;
29pub mod select;
30pub mod simple_extract;
31pub mod urlencoding;
32pub mod vrl_processor;
33
34use std::str::FromStr;
35
36use cmcd::CmcdProcessor;
37use csv::CsvProcessor;
38use date::DateProcessor;
39use decolorize::DecolorizeProcessor;
40use digest::DigestProcessor;
41use dissect::DissectProcessor;
42use enum_dispatch::enum_dispatch;
43use epoch::EpochProcessor;
44use gsub::GsubProcessor;
45use join::JoinProcessor;
46use json_path::JsonPathProcessor;
47use letter::LetterProcessor;
48use regex::RegexProcessor;
49use snafu::{OptionExt, ResultExt};
50use urlencoding::UrlEncodingProcessor;
51use vrl::value::Value as VrlValue;
52
53use crate::error::{
54    Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
55    ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
56    Result, UnsupportedProcessorSnafu,
57};
58use crate::etl::field::{Field, Fields};
59use crate::etl::processor::filter::FilterProcessor;
60use crate::etl::processor::json_parse::JsonParseProcessor;
61use crate::etl::processor::select::SelectProcessor;
62use crate::etl::processor::simple_extract::SimpleExtractProcessor;
63use crate::etl::processor::vrl_processor::VrlProcessor;
64
65const FIELD_NAME: &str = "field";
66const FIELDS_NAME: &str = "fields";
67const IGNORE_MISSING_NAME: &str = "ignore_missing";
68const METHOD_NAME: &str = "method";
69const PATTERN_NAME: &str = "pattern";
70const PATTERNS_NAME: &str = "patterns";
71const SEPARATOR_NAME: &str = "separator";
72const TARGET_FIELDS_NAME: &str = "target_fields";
73const JSON_PATH_NAME: &str = "json_path";
74const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
75const KEY_NAME: &str = "key";
76const TYPE_NAME: &str = "type";
77const RENAME_TO_KEY: &str = "rename_to";
78
79/// Macro to extract a string value from a YAML map
80#[macro_export]
81macro_rules! yaml_map_get_str {
82    ($map:expr, $key:expr, $value:expr) => {
83        $map.get(&yaml_rust::Yaml::String($key.to_string()))
84            .and_then(|v| v.as_str())
85            .with_context(|| InvalidFieldRenameSnafu {
86                value: $value.clone(),
87            })
88    };
89}
90
91lazy_static::lazy_static! {
92    static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
93        Ok(v.as_str().unwrap_or_default().into())
94    };
95
96    static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
97        match v {
98            yaml_rust::Yaml::String(s) => Field::from_str(s),
99            yaml_rust::Yaml::Hash(m) => {
100                let key = yaml_map_get_str!(m, KEY_NAME, v)?;
101                let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
102                Ok(Field::new(key, Some(rename_to.to_string())))
103            }
104            _ => FieldMustBeTypeSnafu {
105                field,
106                ty: "string or key-rename_to map",
107            }
108            .fail(),
109        }
110    };
111}
112
113/// Processor trait defines the interface for all processors.
114///
115/// A processor is a transformation that can be applied to a field in a document
116/// It can be used to extract, transform, or enrich data
117/// Now Processor only have one input field. In the future, we may support multiple input fields.
118/// The output of a processor is a map of key-value pairs that will be merged into the document when you use exec_map method.
119#[enum_dispatch(ProcessorKind)]
120pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
121    /// Get the processor's kind
122    fn kind(&self) -> &str;
123
124    /// Whether to ignore missing
125    fn ignore_missing(&self) -> bool;
126
127    /// Execute the processor on a vector which be preprocessed by the pipeline
128    fn exec_mut(&self, val: VrlValue) -> Result<VrlValue>;
129}
130
131#[derive(Debug)]
132#[enum_dispatch]
133pub enum ProcessorKind {
134    Cmcd(CmcdProcessor),
135    Csv(CsvProcessor),
136    Dissect(DissectProcessor),
137    Gsub(GsubProcessor),
138    Join(JoinProcessor),
139    Letter(LetterProcessor),
140    Regex(RegexProcessor),
141    UrlEncoding(UrlEncodingProcessor),
142    Epoch(EpochProcessor),
143    Date(DateProcessor),
144    JsonPath(JsonPathProcessor),
145    JsonParse(JsonParseProcessor),
146    SimpleJsonPath(SimpleExtractProcessor),
147    Decolorize(DecolorizeProcessor),
148    Digest(DigestProcessor),
149    Select(SelectProcessor),
150    Vrl(VrlProcessor),
151    Filter(FilterProcessor),
152}
153
154#[derive(Debug, Default)]
155pub struct Processors {
156    /// A ordered list of processors
157    /// The order of processors is important
158    /// The output of the first processor will be the input of the second processor
159    pub processors: Vec<ProcessorKind>,
160}
161
162impl std::ops::Deref for Processors {
163    type Target = Vec<ProcessorKind>;
164
165    fn deref(&self) -> &Self::Target {
166        &self.processors
167    }
168}
169
170impl std::ops::DerefMut for Processors {
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        &mut self.processors
173    }
174}
175
176impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
177    type Error = Error;
178
179    fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
180        let mut processors_builders = vec![];
181        for doc in vec {
182            let processor = parse_processor(doc)?;
183            processors_builders.push(processor);
184        }
185        Ok(Processors {
186            processors: processors_builders,
187        })
188    }
189}
190
191fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
192    let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
193
194    let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
195
196    let value = map
197        .get(key)
198        .unwrap()
199        .as_hash()
200        .context(ProcessorMustBeMapSnafu)?;
201
202    let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
203
204    let processor = match str_key {
205        cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
206        csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
207        dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
208        epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
209        date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
210        gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
211        join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
212        letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
213        regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
214        urlencoding::PROCESSOR_URL_ENCODING => {
215            ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
216        }
217        json_path::PROCESSOR_JSON_PATH => {
218            ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
219        }
220        decolorize::PROCESSOR_DECOLORIZE => {
221            ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
222        }
223        digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
224        simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
225            ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
226        }
227        json_parse::PROCESSOR_JSON_PARSE => {
228            ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
229        }
230        vrl_processor::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
231        select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
232        filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?),
233        _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
234    };
235
236    Ok(processor)
237}
238
239pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
240    v.as_str()
241        .map(|s| s.to_string())
242        .context(FieldMustBeTypeSnafu {
243            field,
244            ty: "string",
245        })
246}
247
248pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
249    yaml_list(v, *STRING_FN, field)
250}
251
252pub(crate) fn yaml_list<T>(
253    v: &yaml_rust::Yaml,
254    conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
255    field: &str,
256) -> Result<Vec<T>> {
257    v.as_vec()
258        .context(FieldMustBeTypeSnafu { field, ty: "list" })?
259        .iter()
260        .map(|v| conv_fn(field, v))
261        .collect()
262}
263
264pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
265    v.as_bool().context(FieldMustBeTypeSnafu {
266        field,
267        ty: "boolean",
268    })
269}
270
271pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
272where
273    T: std::str::FromStr,
274    T::Err: std::error::Error + Send + Sync + 'static,
275{
276    yaml_string(v, field)?
277        .parse::<T>()
278        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
279        .context(FailedParseFieldFromStringSnafu { field })
280}
281
282pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
283where
284    T: std::str::FromStr,
285    T::Err: std::error::Error + Send + Sync + 'static,
286{
287    yaml_strings(v, field).and_then(|v| {
288        v.into_iter()
289            .map(|s| {
290                s.parse::<T>()
291                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
292                    .context(FailedParseFieldFromStringSnafu { field })
293            })
294            .collect()
295    })
296}
297
298pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
299    yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
300}
301
302pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
303    STRING_OR_HASH_FN(field, v)
304}