pipeline/etl/processor/
gsub.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19    Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
20    ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25    IGNORE_MISSING_NAME, PATTERN_NAME,
26};
27use crate::etl::value::Value;
28
29pub(crate) const PROCESSOR_GSUB: &str = "gsub";
30
31const REPLACEMENT_NAME: &str = "replacement";
32
33/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
34#[derive(Debug)]
35pub struct GsubProcessor {
36    fields: Fields,
37    pattern: Regex,
38    replacement: String,
39    ignore_missing: bool,
40}
41
42impl GsubProcessor {
43    fn process_string(&self, val: &str) -> Result<Value> {
44        let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
45        let val = Value::String(new_val);
46
47        Ok(val)
48    }
49
50    fn process(&self, val: &Value) -> Result<Value> {
51        match val {
52            Value::String(val) => self.process_string(val),
53            _ => ProcessorExpectStringSnafu {
54                processor: PROCESSOR_GSUB,
55                v: val.clone(),
56            }
57            .fail(),
58        }
59    }
60}
61
62impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
63    type Error = Error;
64
65    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
66        let mut fields = Fields::default();
67        let mut ignore_missing = false;
68        let mut pattern = None;
69        let mut replacement = None;
70
71        for (k, v) in value.iter() {
72            let key = k
73                .as_str()
74                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
75
76            match key {
77                FIELD_NAME => {
78                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
79                }
80                FIELDS_NAME => {
81                    fields = yaml_new_fields(v, FIELDS_NAME)?;
82                }
83                PATTERN_NAME => {
84                    let pattern_str = yaml_string(v, PATTERN_NAME)?;
85                    pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
86                        pattern: pattern_str,
87                    })?);
88                }
89                REPLACEMENT_NAME => {
90                    let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
91                    replacement = Some(replacement_str);
92                }
93
94                IGNORE_MISSING_NAME => {
95                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
96                }
97
98                _ => {}
99            }
100        }
101
102        Ok(GsubProcessor {
103            fields,
104            pattern: pattern.context(GsubPatternRequiredSnafu)?,
105            replacement: replacement.context(GsubReplacementRequiredSnafu)?,
106            ignore_missing,
107        })
108    }
109}
110
111impl crate::etl::processor::Processor for GsubProcessor {
112    fn kind(&self) -> &str {
113        PROCESSOR_GSUB
114    }
115
116    fn ignore_missing(&self) -> bool {
117        self.ignore_missing
118    }
119
120    fn exec_mut(&self, mut val: Value) -> Result<Value> {
121        for field in self.fields.iter() {
122            let index = field.input_field();
123            match val.get(index) {
124                Some(Value::Null) | None => {
125                    if !self.ignore_missing {
126                        return ProcessorMissingFieldSnafu {
127                            processor: self.kind(),
128                            field: field.input_field(),
129                        }
130                        .fail();
131                    }
132                }
133                Some(v) => {
134                    let result = self.process(v)?;
135                    let output_index = field.target_or_input_field();
136                    val.insert(output_index.to_string(), result)?;
137                }
138            }
139        }
140        Ok(val)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::etl::processor::gsub::GsubProcessor;
148    use crate::etl::value::Value;
149
150    #[test]
151    fn test_string_value() {
152        let processor = GsubProcessor {
153            fields: Fields::default(),
154            pattern: regex::Regex::new(r"\d+").unwrap(),
155            replacement: "xxx".to_string(),
156            ignore_missing: false,
157        };
158
159        let val = Value::String("123".to_string());
160        let result = processor.process(&val).unwrap();
161
162        assert_eq!(result, Value::String("xxx".to_string()));
163    }
164}