pipeline/etl/
processor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod timestamp;
31pub mod urlencoding;
32pub mod vrl;
33
34use std::str::FromStr;
35
36use cmcd::CmcdProcessor;
37use csv::CsvProcessor;
38use date::DateProcessor;
39use decolorize::DecolorizeProcessor;
40use digest::DigestProcessor;
41use dissect::DissectProcessor;
42use enum_dispatch::enum_dispatch;
43use epoch::EpochProcessor;
44use gsub::GsubProcessor;
45use join::JoinProcessor;
46use json_path::JsonPathProcessor;
47use letter::LetterProcessor;
48use regex::RegexProcessor;
49use snafu::{OptionExt, ResultExt};
50use timestamp::TimestampProcessor;
51use urlencoding::UrlEncodingProcessor;
52
53use crate::error::{
54    Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
55    ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
56    Result, UnsupportedProcessorSnafu,
57};
58use crate::etl::field::{Field, Fields};
59use crate::etl::processor::json_parse::JsonParseProcessor;
60use crate::etl::processor::select::SelectProcessor;
61use crate::etl::processor::simple_extract::SimpleExtractProcessor;
62use crate::etl::processor::vrl::VrlProcessor;
63use crate::etl::PipelineMap;
64
65const FIELD_NAME: &str = "field";
66const FIELDS_NAME: &str = "fields";
67const IGNORE_MISSING_NAME: &str = "ignore_missing";
68const METHOD_NAME: &str = "method";
69const PATTERN_NAME: &str = "pattern";
70const PATTERNS_NAME: &str = "patterns";
71const SEPARATOR_NAME: &str = "separator";
72const TARGET_FIELDS_NAME: &str = "target_fields";
73const JSON_PATH_NAME: &str = "json_path";
74const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
75const KEY_NAME: &str = "key";
76const TYPE_NAME: &str = "type";
77const RENAME_TO_KEY: &str = "rename_to";
78
79/// Macro to extract a string value from a YAML map
80#[macro_export]
81macro_rules! yaml_map_get_str {
82    ($map:expr, $key:expr, $value:expr) => {
83        $map.get(&yaml_rust::Yaml::String($key.to_string()))
84            .and_then(|v| v.as_str())
85            .with_context(|| InvalidFieldRenameSnafu {
86                value: $value.clone(),
87            })
88    };
89}
90
91lazy_static::lazy_static! {
92    static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
93        Ok(v.as_str().unwrap_or_default().into())
94    };
95
96    static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
97        match v {
98            yaml_rust::Yaml::String(s) => Field::from_str(s),
99            yaml_rust::Yaml::Hash(m) => {
100                let key = yaml_map_get_str!(m, KEY_NAME, v)?;
101                let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
102                Ok(Field::new(key, Some(rename_to.to_string())))
103            }
104            _ => FieldMustBeTypeSnafu {
105                field,
106                ty: "string or key-rename_to map",
107            }
108            .fail(),
109        }
110    };
111}
112
113/// Processor trait defines the interface for all processors.
114///
115/// A processor is a transformation that can be applied to a field in a document
116/// It can be used to extract, transform, or enrich data
117/// Now Processor only have one input field. In the future, we may support multiple input fields.
118/// The output of a processor is a map of key-value pairs that will be merged into the document when you use exec_map method.
119#[enum_dispatch(ProcessorKind)]
120pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
121    /// Get the processor's kind
122    fn kind(&self) -> &str;
123
124    /// Whether to ignore missing
125    fn ignore_missing(&self) -> bool;
126
127    /// Execute the processor on a vector which be preprocessed by the pipeline
128    fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
129}
130
131#[derive(Debug)]
132#[enum_dispatch]
133pub enum ProcessorKind {
134    Cmcd(CmcdProcessor),
135    Csv(CsvProcessor),
136    Dissect(DissectProcessor),
137    Gsub(GsubProcessor),
138    Join(JoinProcessor),
139    Letter(LetterProcessor),
140    Regex(RegexProcessor),
141    Timestamp(TimestampProcessor),
142    UrlEncoding(UrlEncodingProcessor),
143    Epoch(EpochProcessor),
144    Date(DateProcessor),
145    JsonPath(JsonPathProcessor),
146    JsonParse(JsonParseProcessor),
147    SimpleJsonPath(SimpleExtractProcessor),
148    Decolorize(DecolorizeProcessor),
149    Digest(DigestProcessor),
150    Select(SelectProcessor),
151    Vrl(VrlProcessor),
152}
153
154#[derive(Debug, Default)]
155pub struct Processors {
156    /// A ordered list of processors
157    /// The order of processors is important
158    /// The output of the first processor will be the input of the second processor
159    pub processors: Vec<ProcessorKind>,
160}
161
162impl std::ops::Deref for Processors {
163    type Target = Vec<ProcessorKind>;
164
165    fn deref(&self) -> &Self::Target {
166        &self.processors
167    }
168}
169
170impl std::ops::DerefMut for Processors {
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        &mut self.processors
173    }
174}
175
176impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
177    type Error = Error;
178
179    fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
180        let mut processors_builders = vec![];
181        for doc in vec {
182            let processor = parse_processor(doc)?;
183            processors_builders.push(processor);
184        }
185        Ok(Processors {
186            processors: processors_builders,
187        })
188    }
189}
190
191fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
192    let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
193
194    let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
195
196    let value = map
197        .get(key)
198        .unwrap()
199        .as_hash()
200        .context(ProcessorMustBeMapSnafu)?;
201
202    let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
203
204    let processor = match str_key {
205        cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
206        csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
207        dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
208        epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
209        date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
210        gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
211        join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
212        letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
213        regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
214        timestamp::PROCESSOR_TIMESTAMP => {
215            ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
216        }
217        urlencoding::PROCESSOR_URL_ENCODING => {
218            ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
219        }
220        json_path::PROCESSOR_JSON_PATH => {
221            ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
222        }
223        decolorize::PROCESSOR_DECOLORIZE => {
224            ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
225        }
226        digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
227        simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
228            ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
229        }
230        json_parse::PROCESSOR_JSON_PARSE => {
231            ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
232        }
233        vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
234        select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
235        _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
236    };
237
238    Ok(processor)
239}
240
241pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
242    v.as_str()
243        .map(|s| s.to_string())
244        .context(FieldMustBeTypeSnafu {
245            field,
246            ty: "string",
247        })
248}
249
250pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
251    yaml_list(v, *STRING_FN, field)
252}
253
254pub(crate) fn yaml_list<T>(
255    v: &yaml_rust::Yaml,
256    conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
257    field: &str,
258) -> Result<Vec<T>> {
259    v.as_vec()
260        .context(FieldMustBeTypeSnafu { field, ty: "list" })?
261        .iter()
262        .map(|v| conv_fn(field, v))
263        .collect()
264}
265
266pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
267    v.as_bool().context(FieldMustBeTypeSnafu {
268        field,
269        ty: "boolean",
270    })
271}
272
273pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
274where
275    T: std::str::FromStr,
276    T::Err: std::error::Error + Send + Sync + 'static,
277{
278    yaml_string(v, field)?
279        .parse::<T>()
280        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
281        .context(FailedParseFieldFromStringSnafu { field })
282}
283
284pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
285where
286    T: std::str::FromStr,
287    T::Err: std::error::Error + Send + Sync + 'static,
288{
289    yaml_strings(v, field).and_then(|v| {
290        v.into_iter()
291            .map(|s| {
292                s.parse::<T>()
293                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
294                    .context(FailedParseFieldFromStringSnafu { field })
295            })
296            .collect()
297    })
298}
299
300pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
301    yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
302}
303
304pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
305    STRING_OR_HASH_FN(field, v)
306}