1pub mod cmcd;
16pub mod csv;
17pub mod date;
18pub mod decolorize;
19pub mod digest;
20pub mod dissect;
21pub mod epoch;
22pub mod gsub;
23pub mod join;
24pub mod json_parse;
25pub mod json_path;
26pub mod letter;
27pub mod regex;
28pub mod select;
29pub mod simple_extract;
30pub mod timestamp;
31pub mod urlencoding;
32
33use std::str::FromStr;
34
35use cmcd::CmcdProcessor;
36use csv::CsvProcessor;
37use date::DateProcessor;
38use decolorize::DecolorizeProcessor;
39use digest::DigestProcessor;
40use dissect::DissectProcessor;
41use enum_dispatch::enum_dispatch;
42use epoch::EpochProcessor;
43use gsub::GsubProcessor;
44use join::JoinProcessor;
45use json_path::JsonPathProcessor;
46use letter::LetterProcessor;
47use regex::RegexProcessor;
48use snafu::{OptionExt, ResultExt};
49use timestamp::TimestampProcessor;
50use urlencoding::UrlEncodingProcessor;
51
52use crate::error::{
53 Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, InvalidFieldRenameSnafu,
54 ProcessorKeyMustBeStringSnafu, ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
55 Result, UnsupportedProcessorSnafu,
56};
57use crate::etl::field::{Field, Fields};
58use crate::etl::processor::json_parse::JsonParseProcessor;
59use crate::etl::processor::select::SelectProcessor;
60use crate::etl::processor::simple_extract::SimpleExtractProcessor;
61use crate::etl::PipelineMap;
62
63const FIELD_NAME: &str = "field";
64const FIELDS_NAME: &str = "fields";
65const IGNORE_MISSING_NAME: &str = "ignore_missing";
66const METHOD_NAME: &str = "method";
67const PATTERN_NAME: &str = "pattern";
68const PATTERNS_NAME: &str = "patterns";
69const SEPARATOR_NAME: &str = "separator";
70const TARGET_FIELDS_NAME: &str = "target_fields";
71const JSON_PATH_NAME: &str = "json_path";
72const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";
73const KEY_NAME: &str = "key";
74const TYPE_NAME: &str = "type";
75const RENAME_TO_KEY: &str = "rename_to";
76
77#[macro_export]
79macro_rules! yaml_map_get_str {
80 ($map:expr, $key:expr, $value:expr) => {
81 $map.get(&yaml_rust::Yaml::String($key.to_string()))
82 .and_then(|v| v.as_str())
83 .with_context(|| InvalidFieldRenameSnafu {
84 value: $value.clone(),
85 })
86 };
87}
88
89lazy_static::lazy_static! {
90 static ref STRING_FN: fn(&str, &yaml_rust::Yaml) -> Result<String> = |_, v| {
91 Ok(v.as_str().unwrap_or_default().into())
92 };
93
94 static ref STRING_OR_HASH_FN: fn(&str, &yaml_rust::Yaml) -> Result<Field> = |field, v| {
95 match v {
96 yaml_rust::Yaml::String(s) => Field::from_str(s),
97 yaml_rust::Yaml::Hash(m) => {
98 let key = yaml_map_get_str!(m, KEY_NAME, v)?;
99 let rename_to = yaml_map_get_str!(m, RENAME_TO_KEY, v)?;
100 Ok(Field::new(key, Some(rename_to.to_string())))
101 }
102 _ => FieldMustBeTypeSnafu {
103 field,
104 ty: "string or key-rename_to map",
105 }
106 .fail(),
107 }
108 };
109}
110
111#[enum_dispatch(ProcessorKind)]
118pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
119 fn kind(&self) -> &str;
121
122 fn ignore_missing(&self) -> bool;
124
125 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
127}
128
129#[derive(Debug)]
130#[enum_dispatch]
131pub enum ProcessorKind {
132 Cmcd(CmcdProcessor),
133 Csv(CsvProcessor),
134 Dissect(DissectProcessor),
135 Gsub(GsubProcessor),
136 Join(JoinProcessor),
137 Letter(LetterProcessor),
138 Regex(RegexProcessor),
139 Timestamp(TimestampProcessor),
140 UrlEncoding(UrlEncodingProcessor),
141 Epoch(EpochProcessor),
142 Date(DateProcessor),
143 JsonPath(JsonPathProcessor),
144 JsonParse(JsonParseProcessor),
145 SimpleJsonPath(SimpleExtractProcessor),
146 Decolorize(DecolorizeProcessor),
147 Digest(DigestProcessor),
148 Select(SelectProcessor),
149}
150
151#[derive(Debug, Default)]
152pub struct Processors {
153 pub processors: Vec<ProcessorKind>,
157}
158
159impl std::ops::Deref for Processors {
160 type Target = Vec<ProcessorKind>;
161
162 fn deref(&self) -> &Self::Target {
163 &self.processors
164 }
165}
166
167impl std::ops::DerefMut for Processors {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 &mut self.processors
170 }
171}
172
173impl TryFrom<&Vec<yaml_rust::Yaml>> for Processors {
174 type Error = Error;
175
176 fn try_from(vec: &Vec<yaml_rust::Yaml>) -> Result<Self> {
177 let mut processors_builders = vec![];
178 for doc in vec {
179 let processor = parse_processor(doc)?;
180 processors_builders.push(processor);
181 }
182 Ok(Processors {
183 processors: processors_builders,
184 })
185 }
186}
187
188fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
189 let map = doc.as_hash().context(ProcessorMustBeMapSnafu)?;
190
191 let key = map.keys().next().context(ProcessorMustHaveStringKeySnafu)?;
192
193 let value = map
194 .get(key)
195 .unwrap()
196 .as_hash()
197 .context(ProcessorMustBeMapSnafu)?;
198
199 let str_key = key.as_str().context(ProcessorKeyMustBeStringSnafu)?;
200
201 let processor = match str_key {
202 cmcd::PROCESSOR_CMCD => ProcessorKind::Cmcd(CmcdProcessor::try_from(value)?),
203 csv::PROCESSOR_CSV => ProcessorKind::Csv(CsvProcessor::try_from(value)?),
204 dissect::PROCESSOR_DISSECT => ProcessorKind::Dissect(DissectProcessor::try_from(value)?),
205 epoch::PROCESSOR_EPOCH => ProcessorKind::Epoch(EpochProcessor::try_from(value)?),
206 date::PROCESSOR_DATE => ProcessorKind::Date(DateProcessor::try_from(value)?),
207 gsub::PROCESSOR_GSUB => ProcessorKind::Gsub(GsubProcessor::try_from(value)?),
208 join::PROCESSOR_JOIN => ProcessorKind::Join(JoinProcessor::try_from(value)?),
209 letter::PROCESSOR_LETTER => ProcessorKind::Letter(LetterProcessor::try_from(value)?),
210 regex::PROCESSOR_REGEX => ProcessorKind::Regex(RegexProcessor::try_from(value)?),
211 timestamp::PROCESSOR_TIMESTAMP => {
212 ProcessorKind::Timestamp(TimestampProcessor::try_from(value)?)
213 }
214 urlencoding::PROCESSOR_URL_ENCODING => {
215 ProcessorKind::UrlEncoding(UrlEncodingProcessor::try_from(value)?)
216 }
217 json_path::PROCESSOR_JSON_PATH => {
218 ProcessorKind::JsonPath(json_path::JsonPathProcessor::try_from(value)?)
219 }
220 decolorize::PROCESSOR_DECOLORIZE => {
221 ProcessorKind::Decolorize(DecolorizeProcessor::try_from(value)?)
222 }
223 digest::PROCESSOR_DIGEST => ProcessorKind::Digest(DigestProcessor::try_from(value)?),
224 simple_extract::PROCESSOR_SIMPLE_EXTRACT => {
225 ProcessorKind::SimpleJsonPath(SimpleExtractProcessor::try_from(value)?)
226 }
227 json_parse::PROCESSOR_JSON_PARSE => {
228 ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
229 }
230 select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
231 _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
232 };
233
234 Ok(processor)
235}
236
237pub(crate) fn yaml_string(v: &yaml_rust::Yaml, field: &str) -> Result<String> {
238 v.as_str()
239 .map(|s| s.to_string())
240 .context(FieldMustBeTypeSnafu {
241 field,
242 ty: "string",
243 })
244}
245
246pub(crate) fn yaml_strings(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<String>> {
247 yaml_list(v, *STRING_FN, field)
248}
249
250pub(crate) fn yaml_list<T>(
251 v: &yaml_rust::Yaml,
252 conv_fn: impl Fn(&str, &yaml_rust::Yaml) -> Result<T>,
253 field: &str,
254) -> Result<Vec<T>> {
255 v.as_vec()
256 .context(FieldMustBeTypeSnafu { field, ty: "list" })?
257 .iter()
258 .map(|v| conv_fn(field, v))
259 .collect()
260}
261
262pub(crate) fn yaml_bool(v: &yaml_rust::Yaml, field: &str) -> Result<bool> {
263 v.as_bool().context(FieldMustBeTypeSnafu {
264 field,
265 ty: "boolean",
266 })
267}
268
269pub(crate) fn yaml_parse_string<T>(v: &yaml_rust::Yaml, field: &str) -> Result<T>
270where
271 T: std::str::FromStr,
272 T::Err: std::error::Error + Send + Sync + 'static,
273{
274 yaml_string(v, field)?
275 .parse::<T>()
276 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
277 .context(FailedParseFieldFromStringSnafu { field })
278}
279
280pub(crate) fn yaml_parse_strings<T>(v: &yaml_rust::Yaml, field: &str) -> Result<Vec<T>>
281where
282 T: std::str::FromStr,
283 T::Err: std::error::Error + Send + Sync + 'static,
284{
285 yaml_strings(v, field).and_then(|v| {
286 v.into_iter()
287 .map(|s| {
288 s.parse::<T>()
289 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
290 .context(FailedParseFieldFromStringSnafu { field })
291 })
292 .collect()
293 })
294}
295
296pub(crate) fn yaml_new_fields(v: &yaml_rust::Yaml, field: &str) -> Result<Fields> {
297 yaml_list(v, *STRING_OR_HASH_FN, field).map(Fields::new)
298}
299
300pub(crate) fn yaml_new_field(v: &yaml_rust::Yaml, field: &str) -> Result<Field> {
301 STRING_OR_HASH_FN(field, v)
302}