pipeline/etl/
transform.rs1pub mod index;
16pub mod transformer;
17
18use snafu::OptionExt;
19
20use crate::error::{
21 Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
22 TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
26use crate::etl::transform::index::Index;
27use crate::etl::value::{Timestamp, Value};
28
29const TRANSFORM_FIELD: &str = "field";
30const TRANSFORM_FIELDS: &str = "fields";
31const TRANSFORM_TYPE: &str = "type";
32const TRANSFORM_INDEX: &str = "index";
33const TRANSFORM_TAG: &str = "tag";
34const TRANSFORM_DEFAULT: &str = "default";
35const TRANSFORM_ON_FAILURE: &str = "on_failure";
36
37pub use transformer::greptime::GreptimeTransformer;
38
39#[derive(Debug, Clone, Default, Copy)]
41pub enum OnFailure {
42 #[default]
44 Ignore,
45 Default,
48}
49
50impl std::str::FromStr for OnFailure {
51 type Err = Error;
52
53 fn from_str(s: &str) -> Result<Self> {
54 match s {
55 "ignore" => Ok(OnFailure::Ignore),
56 "default" => Ok(OnFailure::Default),
57 _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
58 }
59 }
60}
61
62#[derive(Debug, Default, Clone)]
63pub struct Transforms {
64 pub(crate) transforms: Vec<Transform>,
65}
66
67impl Transforms {
68 pub fn transforms(&self) -> &Vec<Transform> {
69 &self.transforms
70 }
71}
72
73impl std::ops::Deref for Transforms {
74 type Target = Vec<Transform>;
75
76 fn deref(&self) -> &Self::Target {
77 &self.transforms
78 }
79}
80
81impl std::ops::DerefMut for Transforms {
82 fn deref_mut(&mut self) -> &mut Self::Target {
83 &mut self.transforms
84 }
85}
86
87impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
88 type Error = Error;
89
90 fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
91 let mut transforms = Vec::with_capacity(32);
92 let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
93 let mut all_required_keys = Vec::with_capacity(32);
94
95 for doc in docs {
96 let transform_builder: Transform = doc
97 .as_hash()
98 .context(TransformElementMustBeMapSnafu)?
99 .try_into()?;
100 let mut transform_output_keys = transform_builder
101 .fields
102 .iter()
103 .map(|f| f.target_or_input_field().to_string())
104 .collect();
105 all_output_keys.append(&mut transform_output_keys);
106
107 let mut transform_required_keys = transform_builder
108 .fields
109 .iter()
110 .map(|f| f.input_field().to_string())
111 .collect();
112 all_required_keys.append(&mut transform_required_keys);
113
114 transforms.push(transform_builder);
115 }
116
117 all_required_keys.sort();
118
119 Ok(Transforms { transforms })
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct Transform {
126 pub fields: Fields,
127 pub type_: Value,
128 pub default: Option<Value>,
129 pub index: Option<Index>,
130 pub tag: bool,
131 pub on_failure: Option<OnFailure>,
132}
133
134impl Default for Transform {
135 fn default() -> Self {
136 Transform {
137 fields: Fields::default(),
138 type_: Value::Null,
139 default: None,
140 index: None,
141 tag: false,
142 on_failure: None,
143 }
144 }
145}
146
147impl Transform {
148 pub(crate) fn get_default(&self) -> Option<&Value> {
149 self.default.as_ref()
150 }
151
152 pub(crate) fn get_type_matched_default_val(&self) -> &Value {
153 &self.type_
154 }
155
156 pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<Value> {
157 if matches!(self.type_, Value::Timestamp(_)) && self.index.is_some_and(|i| i == Index::Time)
158 {
159 return Some(Value::Timestamp(Timestamp::default()));
160 }
161 None
162 }
163}
164
165impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
166 type Error = Error;
167
168 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
169 let mut fields = Fields::default();
170 let mut type_ = Value::Null;
171 let mut default = None;
172 let mut index = None;
173 let mut tag = false;
174 let mut on_failure = None;
175
176 for (k, v) in hash {
177 let key = k
178 .as_str()
179 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
180 match key {
181 TRANSFORM_FIELD => {
182 fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
183 }
184
185 TRANSFORM_FIELDS => {
186 fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
187 }
188
189 TRANSFORM_TYPE => {
190 let t = yaml_string(v, TRANSFORM_TYPE)?;
191 type_ = Value::parse_str_type(&t)?;
192 }
193
194 TRANSFORM_INDEX => {
195 let index_str = yaml_string(v, TRANSFORM_INDEX)?;
196 index = Some(index_str.try_into()?);
197 }
198
199 TRANSFORM_TAG => {
200 tag = yaml_bool(v, TRANSFORM_TAG)?;
201 }
202
203 TRANSFORM_DEFAULT => {
204 default = Some(Value::try_from(v)?);
205 }
206
207 TRANSFORM_ON_FAILURE => {
208 let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
209 on_failure = Some(on_failure_str.parse()?);
210 }
211
212 _ => {}
213 }
214 }
215 let mut final_default = None;
216
217 if let Some(default_value) = default {
218 match (&type_, &default_value) {
219 (Value::Null, _) => {
220 return TransformTypeMustBeSetSnafu {
221 fields: format!("{:?}", fields),
222 default: default_value.to_string(),
223 }
224 .fail();
225 }
226 (_, Value::Null) => {} (_, _) => {
228 let target = type_.parse_str_value(default_value.to_str_value().as_str())?;
229 final_default = Some(target);
230 on_failure = Some(OnFailure::Default);
231 }
232 }
233 }
234 let builder = Transform {
235 fields,
236 type_,
237 default: final_default,
238 index,
239 on_failure,
240 tag,
241 };
242
243 Ok(builder)
244 }
245}