1pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod timestamp;
31pub mod urlencoding;
32pub mod vrl;
33
34use std::str::FromStr;
35
36use cmcd::CmcdProcessor;
37use csv::CsvProcessor;
38use date::DateProcessor;
39use decolorize::DecolorizeProcessor;
40use digest::DigestProcessor;
41use dissect::DissectProcessor;
42use enum_dispatch::enum_dispatch;
43use epoch::EpochProcessor;
44use gsub::GsubProcessor;
45use join::JoinProcessor;
46use json_path::JsonPathProcessor;
47use letter::LetterProcessor;
48use regex::RegexProcessor;
49use snafu::{OptionExt, ResultExt};
50use timestamp::TimestampProcessor;
51use urlencoding::UrlEncodingProcessor;
52
53use crate::error::{
54 Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
55 ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
56 Result, UnsupportedProcessorSnafu,
57};
58use crate::etl::field::{Field, Fields};
59use crate::etl::processor::json_parse::JsonParseProcessor;
60use crate::etl::processor::select::SelectProcessor;
61use crate::etl::processor::simple_extract::SimpleExtractProcessor;
62use crate::etl::processor::vrl::VrlProcessor;
63use crate::etl::PipelineMap;
64
65const FIELD_NAME: &str = "field";
66const FIELDS_NAME: &str = "fields";
67const IGNORE_MISSING_NAME: &str = "ignore_missing";
68const METHOD_NAME: &str = "method";
69const PATTERN_NAME: &str = "pattern";
70const PATTERNS_NAME: &str = "patterns";
71const SEPARATOR_NAME: &str = "separator";
72const TARGET_FIELDS_NAME: &str = "target_fields";
73const JSON_PATH_NAME: &str = "json_path";
74const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
75const KEY_NAME: &str = "key";
76const TYPE_NAME: &str = "type";
77const RENAME_TO_KEY: &str = "rename_to";
78
79#[macro_export]
81macro_rules! yaml_map_get_str {
82 ($map:expr, $key:expr, $value:expr) => {
83 $map.get(&yaml_rust::Yaml::String($key.to_string()))
84 .and_then(|v| v.as_str())
85 .with_context(|| InvalidFieldRenameSnafu {
86 value: $value.clone(),
87 })
88 };
89}
90
91lazy_static::lazy_static! {
92 static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
93 Ok(v.as_str().unwrap_or_default().into())
94 };
95
96 static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
97 match v {
98 yaml_rust::Yaml::String(s) => Field::from_str(s),
99 yaml_rust::Yaml::Hash(m) => {
100 let key = yaml_map_get_str!(m, KEY_NAME, v)?;
101 let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
102 Ok(Field::new(key, Some(rename_to.to_string())))
103 }
104 _ => FieldMustBeTypeSnafu {
105 field,
106 ty: "string or key-rename_to map",
107 }
108 .fail(),
109 }
110 };
111}
112
113#[enum_dispatch(ProcessorKind)]
120pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
121 fn kind(&self) -> &str;
123
124 fn ignore_missing(&self) -> bool;
126
127 fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
129}
130
131#[derive(Debug)]
132#[enum_dispatch]
133pub enum ProcessorKind {
134 Cmcd(CmcdProcessor),
135 Csv(CsvProcessor),
136 Dissect(DissectProcessor),
137 Gsub(GsubProcessor),
138 Join(JoinProcessor),
139 Letter(LetterProcessor),
140 Regex(RegexProcessor),
141 Timestamp(TimestampProcessor),
142 UrlEncoding(UrlEncodingProcessor),
143 Epoch(EpochProcessor),
144 Date(DateProcessor),
145 JsonPath(JsonPathProcessor),
146 JsonParse(JsonParseProcessor),
147 SimpleJsonPath(SimpleExtractProcessor),
148 Decolorize(DecolorizeProcessor),
149 Digest(DigestProcessor),
150 Select(SelectProcessor),
151 Vrl(VrlProcessor),
152}
153
154#[derive(Debug, Default)]
155pub struct Processors {
156 pub processors: Vec<ProcessorKind>,
160}
161
162impl std::ops::Deref for Processors {
163 type Target = Vec<ProcessorKind>;
164
165 fn deref(&self) -> &Self::Target {
166 &self.processors
167 }
168}
169
170impl std::ops::DerefMut for Processors {
171 fn deref_mut(&mut self) -> &mut Self::Target {
172 &mut self.processors
173 }
174}
175
176impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
177 type Error = Error;
178
179 fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
180 let mut processors_builders = vec![];
181 for doc in vec {
182 let processor = parse_processor(doc)?;
183 processors_builders.push(processor);
184 }
185 Ok(Processors {
186 processors: processors_builders,
187 })
188 }
189}
190
191fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
192 let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
193
194 let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
195
196 let value = map
197 .get(key)
198 .unwrap()
199 .as_hash()
200 .context(ProcessorMustBeMapSnafu)?;
201
202 let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
203
204 let processor = match str_key {
205 cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
206 csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
207 dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
208 epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
209 date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
210 gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
211 join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
212 letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
213 regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
214 timestamp::PROCESSOR_TIMESTAMP => {
215 ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
216 }
217 urlencoding::PROCESSOR_URL_ENCODING => {
218 ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
219 }
220 json_path::PROCESSOR_JSON_PATH => {
221 ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
222 }
223 decolorize::PROCESSOR_DECOLORIZE => {
224 ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
225 }
226 digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
227 simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
228 ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
229 }
230 json_parse::PROCESSOR_JSON_PARSE => {
231 ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
232 }
233 vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
234 select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
235 _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
236 };
237
238 Ok(processor)
239}
240
241pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
242 v.as_str()
243 .map(|s| s.to_string())
244 .context(FieldMustBeTypeSnafu {
245 field,
246 ty: "string",
247 })
248}
249
250pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
251 yaml_list(v, *STRING_FN, field)
252}
253
254pub(crate) fn yaml_list<T>(
255 v: &yaml_rust::Yaml,
256 conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
257 field: &str,
258) -> Result<Vec<T>> {
259 v.as_vec()
260 .context(FieldMustBeTypeSnafu { field, ty: "list" })?
261 .iter()
262 .map(|v| conv_fn(field, v))
263 .collect()
264}
265
266pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
267 v.as_bool().context(FieldMustBeTypeSnafu {
268 field,
269 ty: "boolean",
270 })
271}
272
273pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
274where
275 T: std::str::FromStr,
276 T::Err: std::error::Error + Send + Sync + 'static,
277{
278 yaml_string(v, field)?
279 .parse::<T>()
280 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
281 .context(FailedParseFieldFromStringSnafu { field })
282}
283
284pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
285where
286 T: std::str::FromStr,
287 T::Err: std::error::Error + Send + Sync + 'static,
288{
289 yaml_strings(v, field).and_then(|v| {
290 v.into_iter()
291 .map(|s| {
292 s.parse::<T>()
293 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
294 .context(FailedParseFieldFromStringSnafu { field })
295 })
296 .collect()
297 })
298}
299
300pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
301 yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
302}
303
304pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
305 STRING_OR_HASH_FN(field, v)
306}