pipeline/etl/processor/
gsub.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19    Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
20    ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25    IGNORE_MISSING_NAME, PATTERN_NAME,
26};
27use crate::etl::value::Value;
28use crate::etl::PipelineMap;
29
30pub(crate) const PROCESSOR_GSUB: &str = "gsub";
31
32const REPLACEMENT_NAME: &str = "replacement";
33
34/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
35#[derive(Debug)]
36pub struct GsubProcessor {
37    fields: Fields,
38    pattern: Regex,
39    replacement: String,
40    ignore_missing: bool,
41}
42
43impl GsubProcessor {
44    fn process_string(&self, val: &str) -> Result<Value> {
45        let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
46        let val = Value::String(new_val);
47
48        Ok(val)
49    }
50
51    fn process(&self, val: &Value) -> Result<Value> {
52        match val {
53            Value::String(val) => self.process_string(val),
54            _ => ProcessorExpectStringSnafu {
55                processor: PROCESSOR_GSUB,
56                v: val.clone(),
57            }
58            .fail(),
59        }
60    }
61}
62
63impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
64    type Error = Error;
65
66    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
67        let mut fields = Fields::default();
68        let mut ignore_missing = false;
69        let mut pattern = None;
70        let mut replacement = None;
71
72        for (k, v) in value.iter() {
73            let key = k
74                .as_str()
75                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
76
77            match key {
78                FIELD_NAME => {
79                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
80                }
81                FIELDS_NAME => {
82                    fields = yaml_new_fields(v, FIELDS_NAME)?;
83                }
84                PATTERN_NAME => {
85                    let pattern_str = yaml_string(v, PATTERN_NAME)?;
86                    pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
87                        pattern: pattern_str,
88                    })?);
89                }
90                REPLACEMENT_NAME => {
91                    let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
92                    replacement = Some(replacement_str);
93                }
94
95                IGNORE_MISSING_NAME => {
96                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
97                }
98
99                _ => {}
100            }
101        }
102
103        Ok(GsubProcessor {
104            fields,
105            pattern: pattern.context(GsubPatternRequiredSnafu)?,
106            replacement: replacement.context(GsubReplacementRequiredSnafu)?,
107            ignore_missing,
108        })
109    }
110}
111
112impl crate::etl::processor::Processor for GsubProcessor {
113    fn kind(&self) -> &str {
114        PROCESSOR_GSUB
115    }
116
117    fn ignore_missing(&self) -> bool {
118        self.ignore_missing
119    }
120
121    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
122        for field in self.fields.iter() {
123            let index = field.input_field();
124            match val.get(index) {
125                Some(Value::Null) | None => {
126                    if !self.ignore_missing {
127                        return ProcessorMissingFieldSnafu {
128                            processor: self.kind(),
129                            field: field.input_field(),
130                        }
131                        .fail();
132                    }
133                }
134                Some(v) => {
135                    let result = self.process(v)?;
136                    let output_index = field.target_or_input_field();
137                    val.insert(output_index.to_string(), result);
138                }
139            }
140        }
141        Ok(())
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use crate::etl::processor::gsub::GsubProcessor;
149    use crate::etl::value::Value;
150
151    #[test]
152    fn test_string_value() {
153        let processor = GsubProcessor {
154            fields: Fields::default(),
155            pattern: regex::Regex::new(r"\d+").unwrap(),
156            replacement: "xxx".to_string(),
157            ignore_missing: false,
158        };
159
160        let val = Value::String("123".to_string());
161        let result = processor.process(&val).unwrap();
162
163        assert_eq!(result, Value::String("xxx".to_string()));
164    }
165}