1pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod filter;
23pub mod gsub;
24pub mod join;
25pub mod json_parse;
26pub mod json_path;
27pub mod letter;
28pub mod regex;
29pub mod select;
30pub mod simple_extract;
31pub mod urlencoding;
32pub mod vrl_processor;
33
34use std::str::FromStr;
35
36use cmcd::CmcdProcessor;
37use csv::CsvProcessor;
38use date::DateProcessor;
39use decolorize::DecolorizeProcessor;
40use digest::DigestProcessor;
41use dissect::DissectProcessor;
42use enum_dispatch::enum_dispatch;
43use epoch::EpochProcessor;
44use gsub::GsubProcessor;
45use join::JoinProcessor;
46use json_path::JsonPathProcessor;
47use letter::LetterProcessor;
48use regex::RegexProcessor;
49use snafu::{OptionExt, ResultExt};
50use urlencoding::UrlEncodingProcessor;
51use vrl::value::Value as VrlValue;
52
53use crate::error::{
54 Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
55 ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
56 Result, UnsupportedProcessorSnafu,
57};
58use crate::etl::field::{Field, Fields};
59use crate::etl::processor::filter::FilterProcessor;
60use crate::etl::processor::json_parse::JsonParseProcessor;
61use crate::etl::processor::select::SelectProcessor;
62use crate::etl::processor::simple_extract::SimpleExtractProcessor;
63use crate::etl::processor::vrl_processor::VrlProcessor;
64
65const FIELD_NAME: &str = "field";
66const FIELDS_NAME: &str = "fields";
67const IGNORE_MISSING_NAME: &str = "ignore_missing";
68const METHOD_NAME: &str = "method";
69const PATTERN_NAME: &str = "pattern";
70const PATTERNS_NAME: &str = "patterns";
71const SEPARATOR_NAME: &str = "separator";
72const TARGET_FIELDS_NAME: &str = "target_fields";
73const JSON_PATH_NAME: &str = "json_path";
74const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
75const KEY_NAME: &str = "key";
76const TYPE_NAME: &str = "type";
77const RENAME_TO_KEY: &str = "rename_to";
78
79#[macro_export]
81macro_rules! yaml_map_get_str {
82 ($map:expr, $key:expr, $value:expr) => {
83 $map.get(&yaml_rust::Yaml::String($key.to_string()))
84 .and_then(|v| v.as_str())
85 .with_context(|| InvalidFieldRenameSnafu {
86 value: $value.clone(),
87 })
88 };
89}
90
91lazy_static::lazy_static! {
92 static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
93 Ok(v.as_str().unwrap_or_default().into())
94 };
95
96 static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
97 match v {
98 yaml_rust::Yaml::String(s) => Field::from_str(s),
99 yaml_rust::Yaml::Hash(m) => {
100 let key = yaml_map_get_str!(m, KEY_NAME, v)?;
101 let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
102 Ok(Field::new(key, Some(rename_to.to_string())))
103 }
104 _ => FieldMustBeTypeSnafu {
105 field,
106 ty: "string or key-rename_to map",
107 }
108 .fail(),
109 }
110 };
111}
112
113#[enum_dispatch(ProcessorKind)]
120pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
121 fn kind(&self) -> &str;
123
124 fn ignore_missing(&self) -> bool;
126
127 fn exec_mut(&self, val: VrlValue) -> Result<VrlValue>;
129}
130
131#[derive(Debug)]
132#[enum_dispatch]
133pub enum ProcessorKind {
134 Cmcd(CmcdProcessor),
135 Csv(CsvProcessor),
136 Dissect(DissectProcessor),
137 Gsub(GsubProcessor),
138 Join(JoinProcessor),
139 Letter(LetterProcessor),
140 Regex(RegexProcessor),
141 UrlEncoding(UrlEncodingProcessor),
142 Epoch(EpochProcessor),
143 Date(DateProcessor),
144 JsonPath(JsonPathProcessor),
145 JsonParse(JsonParseProcessor),
146 SimpleJsonPath(SimpleExtractProcessor),
147 Decolorize(DecolorizeProcessor),
148 Digest(DigestProcessor),
149 Select(SelectProcessor),
150 Vrl(VrlProcessor),
151 Filter(FilterProcessor),
152}
153
154#[derive(Debug, Default)]
155pub struct Processors {
156 pub processors: Vec<ProcessorKind>,
160}
161
162impl std::ops::Deref for Processors {
163 type Target = Vec<ProcessorKind>;
164
165 fn deref(&self) -> &Self::Target {
166 &self.processors
167 }
168}
169
170impl std::ops::DerefMut for Processors {
171 fn deref_mut(&mut self) -> &mut Self::Target {
172 &mut self.processors
173 }
174}
175
176impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
177 type Error = Error;
178
179 fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
180 let mut processors_builders = vec![];
181 for doc in vec {
182 let processor = parse_processor(doc)?;
183 processors_builders.push(processor);
184 }
185 Ok(Processors {
186 processors: processors_builders,
187 })
188 }
189}
190
191fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
192 let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
193
194 let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
195
196 let value = map
197 .get(key)
198 .unwrap()
199 .as_hash()
200 .context(ProcessorMustBeMapSnafu)?;
201
202 let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
203
204 let processor = match str_key {
205 cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
206 csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
207 dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
208 epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
209 date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
210 gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
211 join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
212 letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
213 regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
214 urlencoding::PROCESSOR_URL_ENCODING => {
215 ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
216 }
217 json_path::PROCESSOR_JSON_PATH => {
218 ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
219 }
220 decolorize::PROCESSOR_DECOLORIZE => {
221 ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
222 }
223 digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
224 simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
225 ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
226 }
227 json_parse::PROCESSOR_JSON_PARSE => {
228 ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
229 }
230 vrl_processor::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
231 select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
232 filter::PROCESSOR_FILTER => ProcessorKind::Filter(FilterProcessor::try_from(value)?),
233 _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
234 };
235
236 Ok(processor)
237}
238
239pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
240 v.as_str()
241 .map(|s| s.to_string())
242 .context(FieldMustBeTypeSnafu {
243 field,
244 ty: "string",
245 })
246}
247
248pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
249 yaml_list(v, *STRING_FN, field)
250}
251
252pub(crate) fn yaml_list<T>(
253 v: &yaml_rust::Yaml,
254 conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
255 field: &str,
256) -> Result<Vec<T>> {
257 v.as_vec()
258 .context(FieldMustBeTypeSnafu { field, ty: "list" })?
259 .iter()
260 .map(|v| conv_fn(field, v))
261 .collect()
262}
263
264pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
265 v.as_bool().context(FieldMustBeTypeSnafu {
266 field,
267 ty: "boolean",
268 })
269}
270
271pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
272where
273 T: std::str::FromStr,
274 T::Err: std::error::Error + Send + Sync + 'static,
275{
276 yaml_string(v, field)?
277 .parse::<T>()
278 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
279 .context(FailedParseFieldFromStringSnafu { field })
280}
281
282pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
283where
284 T: std::str::FromStr,
285 T::Err: std::error::Error + Send + Sync + 'static,
286{
287 yaml_strings(v, field).and_then(|v| {
288 v.into_iter()
289 .map(|s| {
290 s.parse::<T>()
291 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
292 .context(FailedParseFieldFromStringSnafu { field })
293 })
294 .collect()
295 })
296}
297
298pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
299 yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
300}
301
302pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
303 STRING_OR_HASH_FN(field, v)
304}