pipeline/etl/
processor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod urlencoding;
31pub mod vrl;
32
33use std::str::FromStr;
34
35use cmcd::CmcdProcessor;
36use csv::CsvProcessor;
37use date::DateProcessor;
38use decolorize::DecolorizeProcessor;
39use digest::DigestProcessor;
40use dissect::DissectProcessor;
41use enum_dispatch::enum_dispatch;
42use epoch::EpochProcessor;
43use gsub::GsubProcessor;
44use join::JoinProcessor;
45use json_path::JsonPathProcessor;
46use letter::LetterProcessor;
47use regex::RegexProcessor;
48use snafu::{OptionExt, ResultExt};
49use urlencoding::UrlEncodingProcessor;
50
51use crate::error::{
52    Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
53    ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
54    Result, UnsupportedProcessorSnafu,
55};
56use crate::etl::field::{Field, Fields};
57use crate::etl::processor::json_parse::JsonParseProcessor;
58use crate::etl::processor::select::SelectProcessor;
59use crate::etl::processor::simple_extract::SimpleExtractProcessor;
60use crate::etl::processor::vrl::VrlProcessor;
61use crate::Value;
62
63const FIELD_NAME: &str = "field";
64const FIELDS_NAME: &str = "fields";
65const IGNORE_MISSING_NAME: &str = "ignore_missing";
66const METHOD_NAME: &str = "method";
67const PATTERN_NAME: &str = "pattern";
68const PATTERNS_NAME: &str = "patterns";
69const SEPARATOR_NAME: &str = "separator";
70const TARGET_FIELDS_NAME: &str = "target_fields";
71const JSON_PATH_NAME: &str = "json_path";
72const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
73const KEY_NAME: &str = "key";
74const TYPE_NAME: &str = "type";
75const RENAME_TO_KEY: &str = "rename_to";
76
77/// Macro to extract a string value from a YAML map
78#[macro_export]
79macro_rules! yaml_map_get_str {
80    ($map:expr, $key:expr, $value:expr) => {
81        $map.get(&yaml_rust::Yaml::String($key.to_string()))
82            .and_then(|v| v.as_str())
83            .with_context(|| InvalidFieldRenameSnafu {
84                value: $value.clone(),
85            })
86    };
87}
88
89lazy_static::lazy_static! {
90    static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
91        Ok(v.as_str().unwrap_or_default().into())
92    };
93
94    static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
95        match v {
96            yaml_rust::Yaml::String(s) => Field::from_str(s),
97            yaml_rust::Yaml::Hash(m) => {
98                let key = yaml_map_get_str!(m, KEY_NAME, v)?;
99                let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
100                Ok(Field::new(key, Some(rename_to.to_string())))
101            }
102            _ => FieldMustBeTypeSnafu {
103                field,
104                ty: "string or key-rename_to map",
105            }
106            .fail(),
107        }
108    };
109}
110
111/// Processor trait defines the interface for all processors.
112///
113/// A processor is a transformation that can be applied to a field in a document
114/// It can be used to extract, transform, or enrich data
115/// Now Processor only have one input field. In the future, we may support multiple input fields.
116/// The output of a processor is a map of key-value pairs that will be merged into the document when you use exec_map method.
117#[enum_dispatch(ProcessorKind)]
118pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
119    /// Get the processor's kind
120    fn kind(&self) -> &str;
121
122    /// Whether to ignore missing
123    fn ignore_missing(&self) -> bool;
124
125    /// Execute the processor on a vector which be preprocessed by the pipeline
126    fn exec_mut(&self, val: Value) -> Result<Value>;
127}
128
129#[derive(Debug)]
130#[enum_dispatch]
131pub enum ProcessorKind {
132    Cmcd(CmcdProcessor),
133    Csv(CsvProcessor),
134    Dissect(DissectProcessor),
135    Gsub(GsubProcessor),
136    Join(JoinProcessor),
137    Letter(LetterProcessor),
138    Regex(RegexProcessor),
139    UrlEncoding(UrlEncodingProcessor),
140    Epoch(EpochProcessor),
141    Date(DateProcessor),
142    JsonPath(JsonPathProcessor),
143    JsonParse(JsonParseProcessor),
144    SimpleJsonPath(SimpleExtractProcessor),
145    Decolorize(DecolorizeProcessor),
146    Digest(DigestProcessor),
147    Select(SelectProcessor),
148    Vrl(VrlProcessor),
149}
150
151#[derive(Debug, Default)]
152pub struct Processors {
153    /// A ordered list of processors
154    /// The order of processors is important
155    /// The output of the first processor will be the input of the second processor
156    pub processors: Vec<ProcessorKind>,
157}
158
159impl std::ops::Deref for Processors {
160    type Target = Vec<ProcessorKind>;
161
162    fn deref(&self) -> &Self::Target {
163        &self.processors
164    }
165}
166
167impl std::ops::DerefMut for Processors {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        &mut self.processors
170    }
171}
172
173impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
174    type Error = Error;
175
176    fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
177        let mut processors_builders = vec![];
178        for doc in vec {
179            let processor = parse_processor(doc)?;
180            processors_builders.push(processor);
181        }
182        Ok(Processors {
183            processors: processors_builders,
184        })
185    }
186}
187
188fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
189    let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
190
191    let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
192
193    let value = map
194        .get(key)
195        .unwrap()
196        .as_hash()
197        .context(ProcessorMustBeMapSnafu)?;
198
199    let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
200
201    let processor = match str_key {
202        cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
203        csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
204        dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
205        epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
206        date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
207        gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
208        join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
209        letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
210        regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
211        urlencoding::PROCESSOR_URL_ENCODING => {
212            ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
213        }
214        json_path::PROCESSOR_JSON_PATH => {
215            ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
216        }
217        decolorize::PROCESSOR_DECOLORIZE => {
218            ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
219        }
220        digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
221        simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
222            ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
223        }
224        json_parse::PROCESSOR_JSON_PARSE => {
225            ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
226        }
227        vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
228        select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
229        _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
230    };
231
232    Ok(processor)
233}
234
235pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
236    v.as_str()
237        .map(|s| s.to_string())
238        .context(FieldMustBeTypeSnafu {
239            field,
240            ty: "string",
241        })
242}
243
244pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
245    yaml_list(v, *STRING_FN, field)
246}
247
248pub(crate) fn yaml_list<T>(
249    v: &yaml_rust::Yaml,
250    conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
251    field: &str,
252) -> Result<Vec<T>> {
253    v.as_vec()
254        .context(FieldMustBeTypeSnafu { field, ty: "list" })?
255        .iter()
256        .map(|v| conv_fn(field, v))
257        .collect()
258}
259
260pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
261    v.as_bool().context(FieldMustBeTypeSnafu {
262        field,
263        ty: "boolean",
264    })
265}
266
267pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
268where
269    T: std::str::FromStr,
270    T::Err: std::error::Error + Send + Sync + 'static,
271{
272    yaml_string(v, field)?
273        .parse::<T>()
274        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
275        .context(FailedParseFieldFromStringSnafu { field })
276}
277
278pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
279where
280    T: std::str::FromStr,
281    T::Err: std::error::Error + Send + Sync + 'static,
282{
283    yaml_strings(v, field).and_then(|v| {
284        v.into_iter()
285            .map(|s| {
286                s.parse::<T>()
287                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
288                    .context(FailedParseFieldFromStringSnafu { field })
289            })
290            .collect()
291    })
292}
293
294pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
295    yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
296}
297
298pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
299    STRING_OR_HASH_FN(field, v)
300}