pipeline/etl/processor/
simple_extract.rs1use snafu::OptionExt as _;
16
17use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result};
18use crate::etl::field::Fields;
19use crate::etl::processor::{
20 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
21 IGNORE_MISSING_NAME, KEY_NAME,
22};
23use crate::{PipelineMap, Processor, Value};
24
25pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
26
27#[derive(Debug, Default)]
28pub struct SimpleExtractProcessor {
29 fields: Fields,
30 key: Vec<String>,
33 ignore_missing: bool,
34}
35
36impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
37 type Error = Error;
38
39 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
40 let mut fields = Fields::default();
41 let mut ignore_missing = false;
42 let mut keys = vec![];
43
44 for (k, v) in value.iter() {
45 let key = k
46 .as_str()
47 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
48 match key {
49 FIELD_NAME => {
50 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
51 }
52 FIELDS_NAME => {
53 fields = yaml_new_fields(v, FIELDS_NAME)?;
54 }
55 IGNORE_MISSING_NAME => {
56 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
57 }
58 KEY_NAME => {
59 let key_str = yaml_string(v, KEY_NAME)?;
60 keys.extend(key_str.split(".").map(|s| s.to_string()));
61 }
62 _ => {}
63 }
64 }
65
66 let processor = SimpleExtractProcessor {
67 fields,
68 key: keys,
69 ignore_missing,
70 };
71
72 Ok(processor)
73 }
74}
75
76impl SimpleExtractProcessor {
77 fn process_field(&self, val: &Value) -> Result<Value> {
78 let mut current = val;
79 for key in self.key.iter() {
80 let Value::Map(map) = current else {
81 return Ok(Value::Null);
82 };
83 let Some(v) = map.get(key) else {
84 return Ok(Value::Null);
85 };
86 current = v;
87 }
88 Ok(current.clone())
89 }
90}
91
92impl Processor for SimpleExtractProcessor {
93 fn kind(&self) -> &str {
94 PROCESSOR_SIMPLE_EXTRACT
95 }
96
97 fn ignore_missing(&self) -> bool {
98 self.ignore_missing
99 }
100
101 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
102 for field in self.fields.iter() {
103 let index = field.input_field();
104 match val.get(index) {
105 Some(v) => {
106 let processed = self.process_field(v)?;
107 let output_index = field.target_or_input_field();
108 val.insert(output_index.to_string(), processed);
109 }
110 None => {
111 if !self.ignore_missing {
112 return ProcessorMissingFieldSnafu {
113 processor: self.kind(),
114 field: field.input_field(),
115 }
116 .fail();
117 }
118 }
119 }
120 }
121 Ok(())
122 }
123}
124
125#[cfg(test)]
126mod test {
127
128 #[test]
129 fn test_simple_extract() {
130 use super::*;
131 use crate::{Map, Value};
132
133 let processor = SimpleExtractProcessor {
134 key: vec!["hello".to_string()],
135 ..Default::default()
136 };
137
138 let result = processor
139 .process_field(&Value::Map(Map::one(
140 "hello",
141 Value::String("world".to_string()),
142 )))
143 .unwrap();
144
145 assert_eq!(result, Value::String("world".to_string()));
146 }
147}