pipeline/etl/processor/
join.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16
17use crate::error::{
18    Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
19    ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME, SEPARATOR_NAME,
25};
26use crate::etl::value::{Array, Value};
27use crate::etl::PipelineMap;
28
29pub(crate) const PROCESSOR_JOIN: &str = "join";
30
31/// A processor to join each element of an array into a single string using a separator string between each element
32#[derive(Debug, Default)]
33pub struct JoinProcessor {
34    fields: Fields,
35    separator: String,
36    ignore_missing: bool,
37}
38
39impl JoinProcessor {
40    fn process(&self, arr: &Array) -> Result<Value> {
41        let val = arr
42            .iter()
43            .map(|v| v.to_str_value())
44            .collect::<Vec<String>>()
45            .join(&self.separator);
46
47        Ok(Value::String(val))
48    }
49}
50
51impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
52    type Error = Error;
53
54    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
55        let mut fields = Fields::default();
56        let mut separator = None;
57        let mut ignore_missing = false;
58
59        for (k, v) in value.iter() {
60            let key = k
61                .as_str()
62                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
63
64            match key {
65                FIELD_NAME => {
66                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
67                }
68                FIELDS_NAME => {
69                    fields = yaml_new_fields(v, FIELDS_NAME)?;
70                }
71                SEPARATOR_NAME => {
72                    separator = Some(yaml_string(v, SEPARATOR_NAME)?);
73                }
74                IGNORE_MISSING_NAME => {
75                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
76                }
77                _ => {}
78            }
79        }
80
81        Ok(JoinProcessor {
82            fields,
83            separator: separator.context(JoinSeparatorRequiredSnafu)?,
84            ignore_missing,
85        })
86    }
87}
88
89impl Processor for JoinProcessor {
90    fn kind(&self) -> &str {
91        PROCESSOR_JOIN
92    }
93
94    fn ignore_missing(&self) -> bool {
95        self.ignore_missing
96    }
97
98    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
99        for field in self.fields.iter() {
100            let index = field.input_field();
101            match val.get(index) {
102                Some(Value::Array(arr)) => {
103                    let result = self.process(arr)?;
104                    let output_index = field.target_or_input_field();
105                    val.insert(output_index.to_string(), result);
106                }
107                Some(Value::Null) | None => {
108                    if !self.ignore_missing {
109                        return ProcessorMissingFieldSnafu {
110                            processor: self.kind(),
111                            field: field.input_field(),
112                        }
113                        .fail();
114                    }
115                }
116                Some(v) => {
117                    return ProcessorExpectStringSnafu {
118                        processor: self.kind(),
119                        v: v.clone(),
120                    }
121                    .fail();
122                }
123            }
124        }
125
126        Ok(())
127    }
128}
129
130#[cfg(test)]
131mod tests {
132
133    use crate::etl::processor::join::JoinProcessor;
134    use crate::etl::value::Value;
135
136    #[test]
137    fn test_join_processor() {
138        let processor = JoinProcessor {
139            separator: "-".to_string(),
140            ..Default::default()
141        };
142
143        let arr = vec![
144            Value::String("a".to_string()),
145            Value::String("b".to_string()),
146        ]
147        .into();
148        let result = processor.process(&arr).unwrap();
149        assert_eq!(result, Value::String("a-b".to_string()));
150    }
151}