pipeline/etl/processor/
join.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16
17use crate::error::{
18    Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
19    ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME, SEPARATOR_NAME,
25};
26use crate::etl::value::{Array, Value};
27
28pub(crate) const PROCESSOR_JOIN: &str = "join";
29
30/// A processor to join each element of an array into a single string using a separator string between each element
31#[derive(Debug, Default)]
32pub struct JoinProcessor {
33    fields: Fields,
34    separator: String,
35    ignore_missing: bool,
36}
37
38impl JoinProcessor {
39    fn process(&self, arr: &Array) -> Result<Value> {
40        let val = arr
41            .iter()
42            .map(|v| v.to_str_value())
43            .collect::<Vec<String>>()
44            .join(&self.separator);
45
46        Ok(Value::String(val))
47    }
48}
49
50impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
51    type Error = Error;
52
53    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
54        let mut fields = Fields::default();
55        let mut separator = None;
56        let mut ignore_missing = false;
57
58        for (k, v) in value.iter() {
59            let key = k
60                .as_str()
61                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
62
63            match key {
64                FIELD_NAME => {
65                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
66                }
67                FIELDS_NAME => {
68                    fields = yaml_new_fields(v, FIELDS_NAME)?;
69                }
70                SEPARATOR_NAME => {
71                    separator = Some(yaml_string(v, SEPARATOR_NAME)?);
72                }
73                IGNORE_MISSING_NAME => {
74                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
75                }
76                _ => {}
77            }
78        }
79
80        Ok(JoinProcessor {
81            fields,
82            separator: separator.context(JoinSeparatorRequiredSnafu)?,
83            ignore_missing,
84        })
85    }
86}
87
88impl Processor for JoinProcessor {
89    fn kind(&self) -> &str {
90        PROCESSOR_JOIN
91    }
92
93    fn ignore_missing(&self) -> bool {
94        self.ignore_missing
95    }
96
97    fn exec_mut(&self, mut val: Value) -> Result<Value> {
98        for field in self.fields.iter() {
99            let index = field.input_field();
100            match val.get(index) {
101                Some(Value::Array(arr)) => {
102                    let result = self.process(arr)?;
103                    let output_index = field.target_or_input_field();
104                    val.insert(output_index.to_string(), result)?;
105                }
106                Some(Value::Null) | None => {
107                    if !self.ignore_missing {
108                        return ProcessorMissingFieldSnafu {
109                            processor: self.kind(),
110                            field: field.input_field(),
111                        }
112                        .fail();
113                    }
114                }
115                Some(v) => {
116                    return ProcessorExpectStringSnafu {
117                        processor: self.kind(),
118                        v: v.clone(),
119                    }
120                    .fail();
121                }
122            }
123        }
124
125        Ok(val)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131
132    use crate::etl::processor::join::JoinProcessor;
133    use crate::etl::value::Value;
134
135    #[test]
136    fn test_join_processor() {
137        let processor = JoinProcessor {
138            separator: "-".to_string(),
139            ..Default::default()
140        };
141
142        let arr = vec![
143            Value::String("a".to_string()),
144            Value::String("b".to_string()),
145        ]
146        .into();
147        let result = processor.process(&arr).unwrap();
148        assert_eq!(result, Value::String("a-b".to_string()));
149    }
150}