pipeline/etl/processor/
join.rs1use snafu::OptionExt;
16
17use crate::error::{
18 Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
19 ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24 IGNORE_MISSING_NAME, SEPARATOR_NAME,
25};
26use crate::etl::value::{Array, Value};
27use crate::etl::PipelineMap;
28
29pub(crate) const PROCESSOR_JOIN: &str = "join";
30
31#[derive(Debug, Default)]
33pub struct JoinProcessor {
34 fields: Fields,
35 separator: String,
36 ignore_missing: bool,
37}
38
39impl JoinProcessor {
40 fn process(&self, arr: &Array) -> Result<Value> {
41 let val = arr
42 .iter()
43 .map(|v| v.to_str_value())
44 .collect::<Vec<String>>()
45 .join(&self.separator);
46
47 Ok(Value::String(val))
48 }
49}
50
51impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
52 type Error = Error;
53
54 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
55 let mut fields = Fields::default();
56 let mut separator = None;
57 let mut ignore_missing = false;
58
59 for (k, v) in value.iter() {
60 let key = k
61 .as_str()
62 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
63
64 match key {
65 FIELD_NAME => {
66 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
67 }
68 FIELDS_NAME => {
69 fields = yaml_new_fields(v, FIELDS_NAME)?;
70 }
71 SEPARATOR_NAME => {
72 separator = Some(yaml_string(v, SEPARATOR_NAME)?);
73 }
74 IGNORE_MISSING_NAME => {
75 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
76 }
77 _ => {}
78 }
79 }
80
81 Ok(JoinProcessor {
82 fields,
83 separator: separator.context(JoinSeparatorRequiredSnafu)?,
84 ignore_missing,
85 })
86 }
87}
88
89impl Processor for JoinProcessor {
90 fn kind(&self) -> &str {
91 PROCESSOR_JOIN
92 }
93
94 fn ignore_missing(&self) -> bool {
95 self.ignore_missing
96 }
97
98 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
99 for field in self.fields.iter() {
100 let index = field.input_field();
101 match val.get(index) {
102 Some(Value::Array(arr)) => {
103 let result = self.process(arr)?;
104 let output_index = field.target_or_input_field();
105 val.insert(output_index.to_string(), result);
106 }
107 Some(Value::Null) | None => {
108 if !self.ignore_missing {
109 return ProcessorMissingFieldSnafu {
110 processor: self.kind(),
111 field: field.input_field(),
112 }
113 .fail();
114 }
115 }
116 Some(v) => {
117 return ProcessorExpectStringSnafu {
118 processor: self.kind(),
119 v: v.clone(),
120 }
121 .fail();
122 }
123 }
124 }
125
126 Ok(())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132
133 use crate::etl::processor::join::JoinProcessor;
134 use crate::etl::value::Value;
135
136 #[test]
137 fn test_join_processor() {
138 let processor = JoinProcessor {
139 separator: "-".to_string(),
140 ..Default::default()
141 };
142
143 let arr = vec![
144 Value::String("a".to_string()),
145 Value::String("b".to_string()),
146 ]
147 .into();
148 let result = processor.process(&arr).unwrap();
149 assert_eq!(result, Value::String("a-b".to_string()));
150 }
151}