pipeline/etl/processor/
join.rs1use snafu::OptionExt;
16
17use crate::error::{
18 Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
19 ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24 IGNORE_MISSING_NAME, SEPARATOR_NAME,
25};
26use crate::etl::value::{Array, Value};
27
28pub(crate) const PROCESSOR_JOIN: &str = "join";
29
30#[derive(Debug, Default)]
32pub struct JoinProcessor {
33 fields: Fields,
34 separator: String,
35 ignore_missing: bool,
36}
37
38impl JoinProcessor {
39 fn process(&self, arr: &Array) -> Result<Value> {
40 let val = arr
41 .iter()
42 .map(|v| v.to_str_value())
43 .collect::<Vec<String>>()
44 .join(&self.separator);
45
46 Ok(Value::String(val))
47 }
48}
49
50impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
51 type Error = Error;
52
53 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
54 let mut fields = Fields::default();
55 let mut separator = None;
56 let mut ignore_missing = false;
57
58 for (k, v) in value.iter() {
59 let key = k
60 .as_str()
61 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
62
63 match key {
64 FIELD_NAME => {
65 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
66 }
67 FIELDS_NAME => {
68 fields = yaml_new_fields(v, FIELDS_NAME)?;
69 }
70 SEPARATOR_NAME => {
71 separator = Some(yaml_string(v, SEPARATOR_NAME)?);
72 }
73 IGNORE_MISSING_NAME => {
74 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
75 }
76 _ => {}
77 }
78 }
79
80 Ok(JoinProcessor {
81 fields,
82 separator: separator.context(JoinSeparatorRequiredSnafu)?,
83 ignore_missing,
84 })
85 }
86}
87
88impl Processor for JoinProcessor {
89 fn kind(&self) -> &str {
90 PROCESSOR_JOIN
91 }
92
93 fn ignore_missing(&self) -> bool {
94 self.ignore_missing
95 }
96
97 fn exec_mut(&self, mut val: Value) -> Result<Value> {
98 for field in self.fields.iter() {
99 let index = field.input_field();
100 match val.get(index) {
101 Some(Value::Array(arr)) => {
102 let result = self.process(arr)?;
103 let output_index = field.target_or_input_field();
104 val.insert(output_index.to_string(), result)?;
105 }
106 Some(Value::Null) | None => {
107 if !self.ignore_missing {
108 return ProcessorMissingFieldSnafu {
109 processor: self.kind(),
110 field: field.input_field(),
111 }
112 .fail();
113 }
114 }
115 Some(v) => {
116 return ProcessorExpectStringSnafu {
117 processor: self.kind(),
118 v: v.clone(),
119 }
120 .fail();
121 }
122 }
123 }
124
125 Ok(val)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131
132 use crate::etl::processor::join::JoinProcessor;
133 use crate::etl::value::Value;
134
135 #[test]
136 fn test_join_processor() {
137 let processor = JoinProcessor {
138 separator: "-".to_string(),
139 ..Default::default()
140 };
141
142 let arr = vec![
143 Value::String("a".to_string()),
144 Value::String("b".to_string()),
145 ]
146 .into();
147 let result = processor.process(&arr).unwrap();
148 assert_eq!(result, Value::String("a-b".to_string()));
149 }
150}