pipeline/etl/
processor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod timestamp;
31pub mod urlencoding;
32
33use std::str::FromStr;
34
35use cmcd::CmcdProcessor;
36use csv::CsvProcessor;
37use date::DateProcessor;
38use decolorize::DecolorizeProcessor;
39use digest::DigestProcessor;
40use dissect::DissectProcessor;
41use enum_dispatch::enum_dispatch;
42use epoch::EpochProcessor;
43use gsub::GsubProcessor;
44use join::JoinProcessor;
45use json_path::JsonPathProcessor;
46use letter::LetterProcessor;
47use regex::RegexProcessor;
48use snafu::{OptionExt, ResultExt};
49use timestamp::TimestampProcessor;
50use urlencoding::UrlEncodingProcessor;
51
52use crate::error::{
53    Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
54    ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
55    Result, UnsupportedProcessorSnafu,
56};
57use crate::etl::field::{Field, Fields};
58use crate::etl::processor::json_parse::JsonParseProcessor;
59use crate::etl::processor::select::SelectProcessor;
60use crate::etl::processor::simple_extract::SimpleExtractProcessor;
61use crate::etl::PipelineMap;
62
63const FIELD_NAME: &str = "field";
64const FIELDS_NAME: &str = "fields";
65const IGNORE_MISSING_NAME: &str = "ignore_missing";
66const METHOD_NAME: &str = "method";
67const PATTERN_NAME: &str = "pattern";
68const PATTERNS_NAME: &str = "patterns";
69const SEPARATOR_NAME: &str = "separator";
70const TARGET_FIELDS_NAME: &str = "target_fields";
71const JSON_PATH_NAME: &str = "json_path";
72const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
73const KEY_NAME: &str = "key";
74const TYPE_NAME: &str = "type";
75const RENAME_TO_KEY: &str = "rename_to";
76
77/// Macro to extract a string value from a YAML map
78#[macro_export]
79macro_rules! yaml_map_get_str {
80    ($map:expr, $key:expr, $value:expr) => {
81        $map.get(&yaml_rust::Yaml::String($key.to_string()))
82            .and_then(|v| v.as_str())
83            .with_context(|| InvalidFieldRenameSnafu {
84                value: $value.clone(),
85            })
86    };
87}
88
89lazy_static::lazy_static! {
90    static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
91        Ok(v.as_str().unwrap_or_default().into())
92    };
93
94    static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
95        match v {
96            yaml_rust::Yaml::String(s) => Field::from_str(s),
97            yaml_rust::Yaml::Hash(m) => {
98                let key = yaml_map_get_str!(m, KEY_NAME, v)?;
99                let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
100                Ok(Field::new(key, Some(rename_to.to_string())))
101            }
102            _ => FieldMustBeTypeSnafu {
103                field,
104                ty: "string or key-rename_to map",
105            }
106            .fail(),
107        }
108    };
109}
110
111/// Processor trait defines the interface for all processors.
112///
113/// A processor is a transformation that can be applied to a field in a document
114/// It can be used to extract, transform, or enrich data
115/// Now Processor only have one input field. In the future, we may support multiple input fields.
116/// The output of a processor is a map of key-value pairs that will be merged into the document when you use exec_map method.
117#[enum_dispatch(ProcessorKind)]
118pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
119    /// Get the processor's kind
120    fn kind(&self) -> &str;
121
122    /// Whether to ignore missing
123    fn ignore_missing(&self) -> bool;
124
125    /// Execute the processor on a vector which be preprocessed by the pipeline
126    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
127}
128
129#[derive(Debug)]
130#[enum_dispatch]
131pub enum ProcessorKind {
132    Cmcd(CmcdProcessor),
133    Csv(CsvProcessor),
134    Dissect(DissectProcessor),
135    Gsub(GsubProcessor),
136    Join(JoinProcessor),
137    Letter(LetterProcessor),
138    Regex(RegexProcessor),
139    Timestamp(TimestampProcessor),
140    UrlEncoding(UrlEncodingProcessor),
141    Epoch(EpochProcessor),
142    Date(DateProcessor),
143    JsonPath(JsonPathProcessor),
144    JsonParse(JsonParseProcessor),
145    SimpleJsonPath(SimpleExtractProcessor),
146    Decolorize(DecolorizeProcessor),
147    Digest(DigestProcessor),
148    Select(SelectProcessor),
149}
150
151#[derive(Debug, Default)]
152pub struct Processors {
153    /// A ordered list of processors
154    /// The order of processors is important
155    /// The output of the first processor will be the input of the second processor
156    pub processors: Vec<ProcessorKind>,
157}
158
159impl std::ops::Deref for Processors {
160    type Target = Vec<ProcessorKind>;
161
162    fn deref(&self) -> &Self::Target {
163        &self.processors
164    }
165}
166
167impl std::ops::DerefMut for Processors {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        &mut self.processors
170    }
171}
172
173impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
174    type Error = Error;
175
176    fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
177        let mut processors_builders = vec![];
178        for doc in vec {
179            let processor = parse_processor(doc)?;
180            processors_builders.push(processor);
181        }
182        Ok(Processors {
183            processors: processors_builders,
184        })
185    }
186}
187
188fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
189    let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
190
191    let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
192
193    let value = map
194        .get(key)
195        .unwrap()
196        .as_hash()
197        .context(ProcessorMustBeMapSnafu)?;
198
199    let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
200
201    let processor = match str_key {
202        cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
203        csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
204        dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
205        epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
206        date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
207        gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
208        join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
209        letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
210        regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
211        timestamp::PROCESSOR_TIMESTAMP => {
212            ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
213        }
214        urlencoding::PROCESSOR_URL_ENCODING => {
215            ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
216        }
217        json_path::PROCESSOR_JSON_PATH => {
218            ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
219        }
220        decolorize::PROCESSOR_DECOLORIZE => {
221            ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
222        }
223        digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
224        simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
225            ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
226        }
227        json_parse::PROCESSOR_JSON_PARSE => {
228            ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
229        }
230        select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
231        _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
232    };
233
234    Ok(processor)
235}
236
237pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
238    v.as_str()
239        .map(|s| s.to_string())
240        .context(FieldMustBeTypeSnafu {
241            field,
242            ty: "string",
243        })
244}
245
246pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
247    yaml_list(v, *STRING_FN, field)
248}
249
250pub(crate) fn yaml_list<T>(
251    v: &yaml_rust::Yaml,
252    conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
253    field: &str,
254) -> Result<Vec<T>> {
255    v.as_vec()
256        .context(FieldMustBeTypeSnafu { field, ty: "list" })?
257        .iter()
258        .map(|v| conv_fn(field, v))
259        .collect()
260}
261
262pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
263    v.as_bool().context(FieldMustBeTypeSnafu {
264        field,
265        ty: "boolean",
266    })
267}
268
269pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
270where
271    T: std::str::FromStr,
272    T::Err: std::error::Error + Send + Sync + 'static,
273{
274    yaml_string(v, field)?
275        .parse::<T>()
276        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
277        .context(FailedParseFieldFromStringSnafu { field })
278}
279
280pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
281where
282    T: std::str::FromStr,
283    T::Err: std::error::Error + Send + Sync + 'static,
284{
285    yaml_strings(v, field).and_then(|v| {
286        v.into_iter()
287            .map(|s| {
288                s.parse::<T>()
289                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
290                    .context(FailedParseFieldFromStringSnafu { field })
291            })
292            .collect()
293    })
294}
295
296pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
297    yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
298}
299
300pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
301    STRING_OR_HASH_FN(field, v)
302}