1pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod urlencoding;
31pub mod vrl;
32
33use std::str::FromStr;
34
35use cmcd::CmcdProcessor;
36use csv::CsvProcessor;
37use date::DateProcessor;
38use decolorize::DecolorizeProcessor;
39use digest::DigestProcessor;
40use dissect::DissectProcessor;
41use enum_dispatch::enum_dispatch;
42use epoch::EpochProcessor;
43use gsub::GsubProcessor;
44use join::JoinProcessor;
45use json_path::JsonPathProcessor;
46use letter::LetterProcessor;
47use regex::RegexProcessor;
48use snafu::{OptionExt, ResultExt};
49use urlencoding::UrlEncodingProcessor;
50
51use crate::error::{
52 Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
53 ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
54 Result, UnsupportedProcessorSnafu,
55};
56use crate::etl::field::{Field, Fields};
57use crate::etl::processor::json_parse::JsonParseProcessor;
58use crate::etl::processor::select::SelectProcessor;
59use crate::etl::processor::simple_extract::SimpleExtractProcessor;
60use crate::etl::processor::vrl::VrlProcessor;
61use crate::Value;
62
63const FIELD_NAME: &str = "field";
64const FIELDS_NAME: &str = "fields";
65const IGNORE_MISSING_NAME: &str = "ignore_missing";
66const METHOD_NAME: &str = "method";
67const PATTERN_NAME: &str = "pattern";
68const PATTERNS_NAME: &str = "patterns";
69const SEPARATOR_NAME: &str = "separator";
70const TARGET_FIELDS_NAME: &str = "target_fields";
71const JSON_PATH_NAME: &str = "json_path";
72const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
73const KEY_NAME: &str = "key";
74const TYPE_NAME: &str = "type";
75const RENAME_TO_KEY: &str = "rename_to";
76
77#[macro_export]
79macro_rules! yaml_map_get_str {
80 ($map:expr, $key:expr, $value:expr) => {
81 $map.get(&yaml_rust::Yaml::String($key.to_string()))
82 .and_then(|v| v.as_str())
83 .with_context(|| InvalidFieldRenameSnafu {
84 value: $value.clone(),
85 })
86 };
87}
88
89lazy_static::lazy_static! {
90 static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
91 Ok(v.as_str().unwrap_or_default().into())
92 };
93
94 static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
95 match v {
96 yaml_rust::Yaml::String(s) => Field::from_str(s),
97 yaml_rust::Yaml::Hash(m) => {
98 let key = yaml_map_get_str!(m, KEY_NAME, v)?;
99 let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
100 Ok(Field::new(key, Some(rename_to.to_string())))
101 }
102 _ => FieldMustBeTypeSnafu {
103 field,
104 ty: "string or key-rename_to map",
105 }
106 .fail(),
107 }
108 };
109}
110
111#[enum_dispatch(ProcessorKind)]
118pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
119 fn kind(&self) -> &str;
121
122 fn ignore_missing(&self) -> bool;
124
125 fn exec_mut(&self, val: Value) -> Result<Value>;
127}
128
129#[derive(Debug)]
130#[enum_dispatch]
131pub enum ProcessorKind {
132 Cmcd(CmcdProcessor),
133 Csv(CsvProcessor),
134 Dissect(DissectProcessor),
135 Gsub(GsubProcessor),
136 Join(JoinProcessor),
137 Letter(LetterProcessor),
138 Regex(RegexProcessor),
139 UrlEncoding(UrlEncodingProcessor),
140 Epoch(EpochProcessor),
141 Date(DateProcessor),
142 JsonPath(JsonPathProcessor),
143 JsonParse(JsonParseProcessor),
144 SimpleJsonPath(SimpleExtractProcessor),
145 Decolorize(DecolorizeProcessor),
146 Digest(DigestProcessor),
147 Select(SelectProcessor),
148 Vrl(VrlProcessor),
149}
150
151#[derive(Debug, Default)]
152pub struct Processors {
153 pub processors: Vec<ProcessorKind>,
157}
158
159impl std::ops::Deref for Processors {
160 type Target = Vec<ProcessorKind>;
161
162 fn deref(&self) -> &Self::Target {
163 &self.processors
164 }
165}
166
167impl std::ops::DerefMut for Processors {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 &mut self.processors
170 }
171}
172
173impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
174 type Error = Error;
175
176 fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
177 let mut processors_builders = vec![];
178 for doc in vec {
179 let processor = parse_processor(doc)?;
180 processors_builders.push(processor);
181 }
182 Ok(Processors {
183 processors: processors_builders,
184 })
185 }
186}
187
188fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
189 let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
190
191 let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
192
193 let value = map
194 .get(key)
195 .unwrap()
196 .as_hash()
197 .context(ProcessorMustBeMapSnafu)?;
198
199 let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
200
201 let processor = match str_key {
202 cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
203 csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
204 dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
205 epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
206 date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
207 gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
208 join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
209 letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
210 regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
211 urlencoding::PROCESSOR_URL_ENCODING => {
212 ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
213 }
214 json_path::PROCESSOR_JSON_PATH => {
215 ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
216 }
217 decolorize::PROCESSOR_DECOLORIZE => {
218 ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
219 }
220 digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
221 simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
222 ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
223 }
224 json_parse::PROCESSOR_JSON_PARSE => {
225 ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
226 }
227 vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
228 select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
229 _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
230 };
231
232 Ok(processor)
233}
234
235pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
236 v.as_str()
237 .map(|s| s.to_string())
238 .context(FieldMustBeTypeSnafu {
239 field,
240 ty: "string",
241 })
242}
243
244pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
245 yaml_list(v, *STRING_FN, field)
246}
247
248pub(crate) fn yaml_list<T>(
249 v: &yaml_rust::Yaml,
250 conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
251 field: &str,
252) -> Result<Vec<T>> {
253 v.as_vec()
254 .context(FieldMustBeTypeSnafu { field, ty: "list" })?
255 .iter()
256 .map(|v| conv_fn(field, v))
257 .collect()
258}
259
260pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
261 v.as_bool().context(FieldMustBeTypeSnafu {
262 field,
263 ty: "boolean",
264 })
265}
266
267pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
268where
269 T: std::str::FromStr,
270 T::Err: std::error::Error + Send + Sync + 'static,
271{
272 yaml_string(v, field)?
273 .parse::<T>()
274 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
275 .context(FailedParseFieldFromStringSnafu { field })
276}
277
278pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
279where
280 T: std::str::FromStr,
281 T::Err: std::error::Error + Send + Sync + 'static,
282{
283 yaml_strings(v, field).and_then(|v| {
284 v.into_iter()
285 .map(|s| {
286 s.parse::<T>()
287 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
288 .context(FailedParseFieldFromStringSnafu { field })
289 })
290 .collect()
291 })
292}
293
294pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
295 yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
296}
297
298pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
299 STRING_OR_HASH_FN(field, v)
300}