pipeline/etl/processor/
join.rs1use snafu::OptionExt;
16use vrl::prelude::Bytes;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20 Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
21 ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26 IGNORE_MISSING_NAME, SEPARATOR_NAME,
27};
28
29pub(crate) const PROCESSOR_JOIN: &str = "join";
30
31#[derive(Debug, Default)]
33pub struct JoinProcessor {
34 fields: Fields,
35 separator: String,
36 ignore_missing: bool,
37}
38
39impl JoinProcessor {
40 fn process(&self, arr: &[VrlValue]) -> Result<VrlValue> {
41 let val = arr
42 .iter()
43 .map(|v| v.to_string_lossy())
44 .collect::<Vec<_>>()
45 .join(&self.separator);
46
47 Ok(VrlValue::Bytes(Bytes::from(val)))
48 }
49}
50
51impl TryFrom<&yaml_rust::yaml::Hash> for JoinProcessor {
52 type Error = Error;
53
54 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
55 let mut fields = Fields::default();
56 let mut separator = None;
57 let mut ignore_missing = false;
58
59 for (k, v) in value.iter() {
60 let key = k
61 .as_str()
62 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
63
64 match key {
65 FIELD_NAME => {
66 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
67 }
68 FIELDS_NAME => {
69 fields = yaml_new_fields(v, FIELDS_NAME)?;
70 }
71 SEPARATOR_NAME => {
72 separator = Some(yaml_string(v, SEPARATOR_NAME)?);
73 }
74 IGNORE_MISSING_NAME => {
75 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
76 }
77 _ => {}
78 }
79 }
80
81 Ok(JoinProcessor {
82 fields,
83 separator: separator.context(JoinSeparatorRequiredSnafu)?,
84 ignore_missing,
85 })
86 }
87}
88
89impl Processor for JoinProcessor {
90 fn kind(&self) -> &str {
91 PROCESSOR_JOIN
92 }
93
94 fn ignore_missing(&self) -> bool {
95 self.ignore_missing
96 }
97
98 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
99 for field in self.fields.iter() {
100 let index = field.input_field();
101 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
102 match val.get(index) {
103 Some(VrlValue::Array(arr)) => {
104 let result = self.process(arr)?;
105 let output_index = field.target_or_input_field();
106 val.insert(KeyString::from(output_index.to_string()), result);
107 }
108 Some(VrlValue::Null) | None => {
109 if !self.ignore_missing {
110 return ProcessorMissingFieldSnafu {
111 processor: self.kind(),
112 field: field.input_field(),
113 }
114 .fail();
115 }
116 }
117 Some(v) => {
118 return ProcessorExpectStringSnafu {
119 processor: self.kind(),
120 v: v.clone(),
121 }
122 .fail();
123 }
124 }
125 }
126
127 Ok(val)
128 }
129}
130
131#[cfg(test)]
132mod tests {
133
134 use vrl::prelude::Bytes;
135 use vrl::value::Value as VrlValue;
136
137 use crate::etl::processor::join::JoinProcessor;
138
139 #[test]
140 fn test_join_processor() {
141 let processor = JoinProcessor {
142 separator: "-".to_string(),
143 ..Default::default()
144 };
145
146 let arr = vec![
147 VrlValue::Bytes(Bytes::from("a")),
148 VrlValue::Bytes(Bytes::from("b")),
149 ];
150 let result = processor.process(&arr).unwrap();
151 assert_eq!(result, VrlValue::Bytes(Bytes::from("a-b")));
152 }
153}