pipeline/etl/processor/
decolorize.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Removes ANSI color control codes from the input text.
16//!
17//! Similar to [`decolorize`](https://grafana.com/docs/loki/latest/query/log_queries/#removing-color-codes)
18//! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes)
19//! from Vector VRL.
20
21use once_cell::sync::Lazy;
22use regex::Regex;
23use snafu::OptionExt;
24
25use crate::error::{
26    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::Value;
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
36
37static RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap());
38
39/// Remove ANSI color control codes from the input text.
40#[derive(Debug, Default)]
41pub struct DecolorizeProcessor {
42    fields: Fields,
43    ignore_missing: bool,
44}
45
46impl DecolorizeProcessor {
47    fn process_string(&self, val: &str) -> Result<Value> {
48        Ok(Value::String(RE.replace_all(val, "").into_owned()))
49    }
50
51    fn process(&self, val: &Value) -> Result<Value> {
52        match val {
53            Value::String(val) => self.process_string(val),
54            _ => ProcessorExpectStringSnafu {
55                processor: PROCESSOR_DECOLORIZE,
56                v: val.clone(),
57            }
58            .fail(),
59        }
60    }
61}
62
63impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessor {
64    type Error = Error;
65
66    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
67        let mut fields = Fields::default();
68        let mut ignore_missing = false;
69
70        for (k, v) in value.iter() {
71            let key = k
72                .as_str()
73                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
74
75            match key {
76                FIELD_NAME => {
77                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
78                }
79                FIELDS_NAME => {
80                    fields = yaml_new_fields(v, FIELDS_NAME)?;
81                }
82                IGNORE_MISSING_NAME => {
83                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
84                }
85                _ => {}
86            }
87        }
88
89        Ok(DecolorizeProcessor {
90            fields,
91            ignore_missing,
92        })
93    }
94}
95
96impl crate::etl::processor::Processor for DecolorizeProcessor {
97    fn kind(&self) -> &str {
98        PROCESSOR_DECOLORIZE
99    }
100
101    fn ignore_missing(&self) -> bool {
102        self.ignore_missing
103    }
104
105    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
106        for field in self.fields.iter() {
107            let index = field.input_field();
108            match val.get(index) {
109                Some(Value::Null) | None => {
110                    if !self.ignore_missing {
111                        return ProcessorMissingFieldSnafu {
112                            processor: self.kind(),
113                            field: field.input_field(),
114                        }
115                        .fail();
116                    }
117                }
118                Some(v) => {
119                    let result = self.process(v)?;
120                    let output_index = field.target_or_input_field();
121                    val.insert(output_index.to_string(), result);
122                }
123            }
124        }
125        Ok(())
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_decolorize_processor() {
135        let processor = DecolorizeProcessor {
136            fields: Fields::default(),
137            ignore_missing: false,
138        };
139
140        let val = Value::String("\x1b[32mGreen\x1b[0m".to_string());
141        let result = processor.process(&val).unwrap();
142        assert_eq!(result, Value::String("Green".to_string()));
143
144        let val = Value::String("Plain text".to_string());
145        let result = processor.process(&val).unwrap();
146        assert_eq!(result, Value::String("Plain text".to_string()));
147
148        let val = Value::String("\x1b[46mfoo\x1b[0m bar".to_string());
149        let result = processor.process(&val).unwrap();
150        assert_eq!(result, Value::String("foo bar".to_string()));
151    }
152}