pipeline/etl/
transform.rs1pub mod index;
16pub mod transformer;
17
18use snafu::{ensure, OptionExt};
19
20use crate::error::{
21 Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
22 TransformFieldMustBeSetSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
26use crate::etl::transform::index::Index;
27use crate::etl::value::{Timestamp, Value};
28
29const TRANSFORM_FIELD: &str = "field";
30const TRANSFORM_FIELDS: &str = "fields";
31const TRANSFORM_TYPE: &str = "type";
32const TRANSFORM_INDEX: &str = "index";
33const TRANSFORM_TAG: &str = "tag";
34const TRANSFORM_DEFAULT: &str = "default";
35const TRANSFORM_ON_FAILURE: &str = "on_failure";
36
37pub use transformer::greptime::GreptimeTransformer;
38
39#[derive(Debug, Clone, Default, Copy)]
41pub enum OnFailure {
42 #[default]
44 Ignore,
45 Default,
48}
49
50impl std::str::FromStr for OnFailure {
51 type Err = Error;
52
53 fn from_str(s: &str) -> Result<Self> {
54 match s {
55 "ignore" => Ok(OnFailure::Ignore),
56 "default" => Ok(OnFailure::Default),
57 _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
58 }
59 }
60}
61
62#[derive(Debug, Default, Clone)]
63pub struct Transforms {
64 pub(crate) transforms: Vec<Transform>,
65}
66
67impl Transforms {
68 pub fn transforms(&self) -> &Vec<Transform> {
69 &self.transforms
70 }
71}
72
73impl std::ops::Deref for Transforms {
74 type Target = Vec<Transform>;
75
76 fn deref(&self) -> &Self::Target {
77 &self.transforms
78 }
79}
80
81impl std::ops::DerefMut for Transforms {
82 fn deref_mut(&mut self) -> &mut Self::Target {
83 &mut self.transforms
84 }
85}
86
87impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
88 type Error = Error;
89
90 fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
91 let mut transforms = Vec::with_capacity(32);
92 let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
93 let mut all_required_keys = Vec::with_capacity(32);
94
95 for doc in docs {
96 let transform_builder: Transform = doc
97 .as_hash()
98 .context(TransformElementMustBeMapSnafu)?
99 .try_into()?;
100 let mut transform_output_keys = transform_builder
101 .fields
102 .iter()
103 .map(|f| f.target_or_input_field().to_string())
104 .collect();
105 all_output_keys.append(&mut transform_output_keys);
106
107 let mut transform_required_keys = transform_builder
108 .fields
109 .iter()
110 .map(|f| f.input_field().to_string())
111 .collect();
112 all_required_keys.append(&mut transform_required_keys);
113
114 transforms.push(transform_builder);
115 }
116
117 all_required_keys.sort();
118
119 Ok(Transforms { transforms })
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct Transform {
126 pub fields: Fields,
127 pub type_: Value,
128 pub default: Option<Value>,
129 pub index: Option<Index>,
130 pub tag: bool,
131 pub on_failure: Option<OnFailure>,
132}
133
134impl Default for Transform {
135 fn default() -> Self {
136 Transform {
137 fields: Fields::default(),
138 type_: Value::Null,
139 default: None,
140 index: None,
141 tag: false,
142 on_failure: None,
143 }
144 }
145}
146
147impl Transform {
148 pub(crate) fn get_default(&self) -> Option<&Value> {
149 self.default.as_ref()
150 }
151
152 pub(crate) fn get_type_matched_default_val(&self) -> &Value {
153 &self.type_
154 }
155
156 pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<Value> {
157 if matches!(self.type_, Value::Timestamp(_)) && self.index.is_some_and(|i| i == Index::Time)
158 {
159 return Some(Value::Timestamp(Timestamp::default()));
160 }
161 None
162 }
163
164 pub(crate) fn is_timeindex(&self) -> bool {
165 self.index.is_some_and(|i| i == Index::Time)
166 }
167}
168
169impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
170 type Error = Error;
171
172 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
173 let mut fields = Fields::default();
174 let mut type_ = Value::Null;
175 let mut default = None;
176 let mut index = None;
177 let mut tag = false;
178 let mut on_failure = None;
179
180 for (k, v) in hash {
181 let key = k
182 .as_str()
183 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
184 match key {
185 TRANSFORM_FIELD => {
186 fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
187 }
188
189 TRANSFORM_FIELDS => {
190 fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
191 }
192
193 TRANSFORM_TYPE => {
194 let t = yaml_string(v, TRANSFORM_TYPE)?;
195 type_ = Value::parse_str_type(&t)?;
196 }
197
198 TRANSFORM_INDEX => {
199 let index_str = yaml_string(v, TRANSFORM_INDEX)?;
200 index = Some(index_str.try_into()?);
201 }
202
203 TRANSFORM_TAG => {
204 tag = yaml_bool(v, TRANSFORM_TAG)?;
205 }
206
207 TRANSFORM_DEFAULT => {
208 default = Some(Value::try_from(v)?);
209 }
210
211 TRANSFORM_ON_FAILURE => {
212 let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
213 on_failure = Some(on_failure_str.parse()?);
214 }
215
216 _ => {}
217 }
218 }
219
220 ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
222 ensure!(
223 type_ != Value::Null,
224 TransformTypeMustBeSetSnafu {
225 fields: format!("{:?}", fields)
226 }
227 );
228
229 let final_default = if let Some(default_value) = default {
230 match default_value {
231 Value::Null => None,
233 _ => {
234 let target = type_.parse_str_value(default_value.to_str_value().as_str())?;
235 on_failure = Some(OnFailure::Default);
236 Some(target)
237 }
238 }
239 } else {
240 None
241 };
242
243 let builder = Transform {
244 fields,
245 type_,
246 default: final_default,
247 index,
248 on_failure,
249 tag,
250 };
251
252 Ok(builder)
253 }
254}