pipeline/etl/processor/
decolorize.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Removes ANSI color control codes from the input text.
16//!
17//! Similar to [`decolorize`](https://grafana.com/docs/loki/latest/query/log_queries/#removing-color-codes)
18//! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes)
19//! from Vector VRL.
20
21use once_cell::sync::Lazy;
22use regex::Regex;
23use snafu::OptionExt;
24use vrl::prelude::Bytes;
25use vrl::value::{KeyString, Value as VrlValue};
26
27use crate::error::{
28    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
29    ValueMustBeMapSnafu,
30};
31use crate::etl::field::Fields;
32use crate::etl::processor::{
33    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
34};
35
36pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";
37
38static RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap());
39
40/// Remove ANSI color control codes from the input text.
41#[derive(Debug, Default)]
42pub struct DecolorizeProcessor {
43    fields: Fields,
44    ignore_missing: bool,
45}
46
47impl DecolorizeProcessor {
48    fn process_string(&self, val: &str) -> Result<VrlValue> {
49        Ok(VrlValue::Bytes(Bytes::from(
50            RE.replace_all(val, "").to_string(),
51        )))
52    }
53
54    fn process(&self, val: &VrlValue) -> Result<VrlValue> {
55        match val {
56            VrlValue::Bytes(val) => self.process_string(String::from_utf8_lossy(val).as_ref()),
57            _ => ProcessorExpectStringSnafu {
58                processor: PROCESSOR_DECOLORIZE,
59                v: val.clone(),
60            }
61            .fail(),
62        }
63    }
64}
65
66impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessor {
67    type Error = Error;
68
69    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
70        let mut fields = Fields::default();
71        let mut ignore_missing = false;
72
73        for (k, v) in value.iter() {
74            let key = k
75                .as_str()
76                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
77
78            match key {
79                FIELD_NAME => {
80                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
81                }
82                FIELDS_NAME => {
83                    fields = yaml_new_fields(v, FIELDS_NAME)?;
84                }
85                IGNORE_MISSING_NAME => {
86                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
87                }
88                _ => {}
89            }
90        }
91
92        Ok(DecolorizeProcessor {
93            fields,
94            ignore_missing,
95        })
96    }
97}
98
99impl crate::etl::processor::Processor for DecolorizeProcessor {
100    fn kind(&self) -> &str {
101        PROCESSOR_DECOLORIZE
102    }
103
104    fn ignore_missing(&self) -> bool {
105        self.ignore_missing
106    }
107
108    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
109        for field in self.fields.iter() {
110            let index = field.input_field();
111            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
112            match val.get(index) {
113                Some(VrlValue::Null) | None => {
114                    if !self.ignore_missing {
115                        return ProcessorMissingFieldSnafu {
116                            processor: self.kind(),
117                            field: field.input_field(),
118                        }
119                        .fail();
120                    }
121                }
122                Some(v) => {
123                    let result = self.process(v)?;
124                    let output_index = field.target_or_input_field();
125                    val.insert(KeyString::from(output_index), result);
126                }
127            }
128        }
129        Ok(val)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn test_decolorize_processor() {
139        let processor = DecolorizeProcessor {
140            fields: Fields::default(),
141            ignore_missing: false,
142        };
143
144        let val = VrlValue::Bytes(Bytes::from("\x1b[32mGreen\x1b[0m".to_string()));
145        let result = processor.process(&val).unwrap();
146        assert_eq!(result, VrlValue::Bytes(Bytes::from("Green".to_string())));
147
148        let val = VrlValue::Bytes(Bytes::from("Plain text".to_string()));
149        let result = processor.process(&val).unwrap();
150        assert_eq!(
151            result,
152            VrlValue::Bytes(Bytes::from("Plain text".to_string()))
153        );
154
155        let val = VrlValue::Bytes(Bytes::from("\x1b[46mfoo\x1b[0m bar".to_string()));
156        let result = processor.process(&val).unwrap();
157        assert_eq!(result, VrlValue::Bytes(Bytes::from("foo bar".to_string())));
158    }
159}