pipeline/etl/processor/
join.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16use vrl::prelude::Bytes;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20    Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
21    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26    IGNORE_MISSING_NAME, SEPARATOR_NAME,
27};
28
29pub(crate) const PROCESSOR_JOIN: &str = "join";
30
31/// A processor to join each element of an array into a single string using a separator string between each element
32#[derive(Debug, Default)]
33pub struct JoinProcessor {
34    fields: Fields,
35    separator: String,
36    ignore_missing: bool,
37}
38
39impl JoinProcessor {
40    fn process(&self, arr: &[VrlValue]) -> Result<VrlValue> {
41        let val = arr
42            .iter()
43            .map(|v| v.to_string_lossy())
44            .collect::<Vec<_>>()
45            .join(&self.separator);
46
47        Ok(VrlValue::Bytes(Bytes::from(val)))
48    }
49}
50
51impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
52    type Error = Error;
53
54    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
55        let mut fields = Fields::default();
56        let mut separator = None;
57        let mut ignore_missing = false;
58
59        for (k, v) in value.iter() {
60            let key = k
61                .as_str()
62                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
63
64            match key {
65                FIELD_NAME => {
66                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
67                }
68                FIELDS_NAME => {
69                    fields = yaml_new_fields(v, FIELDS_NAME)?;
70                }
71                SEPARATOR_NAME => {
72                    separator = Some(yaml_string(v, SEPARATOR_NAME)?);
73                }
74                IGNORE_MISSING_NAME => {
75                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
76                }
77                _ => {}
78            }
79        }
80
81        Ok(JoinProcessor {
82            fields,
83            separator: separator.context(JoinSeparatorRequiredSnafu)?,
84            ignore_missing,
85        })
86    }
87}
88
89impl Processor for JoinProcessor {
90    fn kind(&self) -> &str {
91        PROCESSOR_JOIN
92    }
93
94    fn ignore_missing(&self) -> bool {
95        self.ignore_missing
96    }
97
98    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
99        for field in self.fields.iter() {
100            let index = field.input_field();
101            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
102            match val.get(index) {
103                Some(VrlValue::Array(arr)) => {
104                    let result = self.process(arr)?;
105                    let output_index = field.target_or_input_field();
106                    val.insert(KeyString::from(output_index.to_string()), result);
107                }
108                Some(VrlValue::Null) | None => {
109                    if !self.ignore_missing {
110                        return ProcessorMissingFieldSnafu {
111                            processor: self.kind(),
112                            field: field.input_field(),
113                        }
114                        .fail();
115                    }
116                }
117                Some(v) => {
118                    return ProcessorExpectStringSnafu {
119                        processor: self.kind(),
120                        v: v.clone(),
121                    }
122                    .fail();
123                }
124            }
125        }
126
127        Ok(val)
128    }
129}
130
131#[cfg(test)]
132mod tests {
133
134    use vrl::prelude::Bytes;
135    use vrl::value::Value as VrlValue;
136
137    use crate::etl::processor::join::JoinProcessor;
138
139    #[test]
140    fn test_join_processor() {
141        let processor = JoinProcessor {
142            separator: "-".to_string(),
143            ..Default::default()
144        };
145
146        let arr = vec![
147            VrlValue::Bytes(Bytes::from("a")),
148            VrlValue::Bytes(Bytes::from("b")),
149        ];
150        let result = processor.process(&arr).unwrap();
151        assert_eq!(result, VrlValue::Bytes(Bytes::from("a-b")));
152    }
153}