pipeline/etl/
transform.rs1pub mod index;
16pub mod transformer;
17
18use api::v1::value::ValueData;
19use api::v1::ColumnDataType;
20use chrono::Utc;
21use snafu::{ensure, OptionExt};
22
23use crate::error::{
24 Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
25 TransformFieldMustBeSetSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
26 UnsupportedTypeInPipelineSnafu,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
30use crate::etl::transform::index::Index;
31use crate::etl::value::{parse_str_type, parse_str_value};
32
33const TRANSFORM_FIELD: &str = "field";
34const TRANSFORM_FIELDS: &str = "fields";
35const TRANSFORM_TYPE: &str = "type";
36const TRANSFORM_INDEX: &str = "index";
37const TRANSFORM_TAG: &str = "tag";
38const TRANSFORM_DEFAULT: &str = "default";
39const TRANSFORM_ON_FAILURE: &str = "on_failure";
40
41pub use transformer::greptime::GreptimeTransformer;
42
43#[derive(Debug, Clone, Default, Copy)]
45pub enum OnFailure {
46 #[default]
48 Ignore,
49 Default,
52}
53
54impl std::str::FromStr for OnFailure {
55 type Err = Error;
56
57 fn from_str(s: &str) -> Result<Self> {
58 match s {
59 "ignore" => Ok(OnFailure::Ignore),
60 "default" => Ok(OnFailure::Default),
61 _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
62 }
63 }
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct Transforms {
68 pub(crate) transforms: Vec<Transform>,
69}
70
71impl Transforms {
72 pub fn transforms(&self) -> &Vec<Transform> {
73 &self.transforms
74 }
75}
76
77impl std::ops::Deref for Transforms {
78 type Target = Vec<Transform>;
79
80 fn deref(&self) -> &Self::Target {
81 &self.transforms
82 }
83}
84
85impl std::ops::DerefMut for Transforms {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.transforms
88 }
89}
90
91impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
92 type Error = Error;
93
94 fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
95 let mut transforms = Vec::with_capacity(32);
96 let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
97 let mut all_required_keys = Vec::with_capacity(32);
98
99 for doc in docs {
100 let transform_builder: Transform = doc
101 .as_hash()
102 .context(TransformElementMustBeMapSnafu)?
103 .try_into()?;
104 let mut transform_output_keys = transform_builder
105 .fields
106 .iter()
107 .map(|f| f.target_or_input_field().to_string())
108 .collect();
109 all_output_keys.append(&mut transform_output_keys);
110
111 let mut transform_required_keys = transform_builder
112 .fields
113 .iter()
114 .map(|f| f.input_field().to_string())
115 .collect();
116 all_required_keys.append(&mut transform_required_keys);
117
118 transforms.push(transform_builder);
119 }
120
121 all_required_keys.sort();
122
123 Ok(Transforms { transforms })
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct Transform {
130 pub fields: Fields,
131 pub type_: ColumnDataType,
132 pub default: Option<ValueData>,
133 pub index: Option<Index>,
134 pub tag: bool,
135 pub on_failure: Option<OnFailure>,
136}
137
138impl Transform {
158 pub(crate) fn get_default(&self) -> Option<&ValueData> {
159 self.default.as_ref()
160 }
161
162 pub(crate) fn get_type_matched_default_val(&self) -> Result<ValueData> {
163 get_default_for_type(&self.type_)
164 }
165
166 pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<ValueData> {
167 if is_timestamp_type(&self.type_) && self.index.is_some_and(|i| i == Index::Time) {
168 let now = Utc::now();
169 match self.type_ {
170 ColumnDataType::TimestampSecond => {
171 return Some(ValueData::TimestampSecondValue(now.timestamp()));
172 }
173 ColumnDataType::TimestampMillisecond => {
174 return Some(ValueData::TimestampMillisecondValue(now.timestamp_millis()));
175 }
176 ColumnDataType::TimestampMicrosecond => {
177 return Some(ValueData::TimestampMicrosecondValue(now.timestamp_micros()));
178 }
179 ColumnDataType::TimestampNanosecond => {
180 return Some(ValueData::TimestampNanosecondValue(
181 now.timestamp_nanos_opt()?,
182 ));
183 }
184 _ => {}
185 }
186 }
187 None
188 }
189
190 pub(crate) fn is_timeindex(&self) -> bool {
191 self.index.is_some_and(|i| i == Index::Time)
192 }
193}
194
195fn is_timestamp_type(ty: &ColumnDataType) -> bool {
196 matches!(
197 ty,
198 ColumnDataType::TimestampSecond
199 | ColumnDataType::TimestampMillisecond
200 | ColumnDataType::TimestampMicrosecond
201 | ColumnDataType::TimestampNanosecond
202 )
203}
204
205fn get_default_for_type(ty: &ColumnDataType) -> Result<ValueData> {
206 let v = match ty {
207 ColumnDataType::Boolean => ValueData::BoolValue(false),
208 ColumnDataType::Int8 => ValueData::I8Value(0),
209 ColumnDataType::Int16 => ValueData::I16Value(0),
210 ColumnDataType::Int32 => ValueData::I32Value(0),
211 ColumnDataType::Int64 => ValueData::I64Value(0),
212 ColumnDataType::Uint8 => ValueData::U8Value(0),
213 ColumnDataType::Uint16 => ValueData::U16Value(0),
214 ColumnDataType::Uint32 => ValueData::U32Value(0),
215 ColumnDataType::Uint64 => ValueData::U64Value(0),
216 ColumnDataType::Float32 => ValueData::F32Value(0.0),
217 ColumnDataType::Float64 => ValueData::F64Value(0.0),
218 ColumnDataType::Binary => ValueData::BinaryValue(jsonb::Value::Null.to_vec()),
219 ColumnDataType::String => ValueData::StringValue(String::new()),
220
221 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(0),
222 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(0),
223 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(0),
224 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(0),
225
226 _ => UnsupportedTypeInPipelineSnafu {
227 ty: ty.as_str_name(),
228 }
229 .fail()?,
230 };
231 Ok(v)
232}
233
234impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
235 type Error = Error;
236
237 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
238 let mut fields = Fields::default();
239 let mut default = None;
240 let mut index = None;
241 let mut tag = false;
242 let mut on_failure = None;
243
244 let mut type_ = None;
245
246 for (k, v) in hash {
247 let key = k
248 .as_str()
249 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
250 match key {
251 TRANSFORM_FIELD => {
252 fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
253 }
254
255 TRANSFORM_FIELDS => {
256 fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
257 }
258
259 TRANSFORM_TYPE => {
260 let t = yaml_string(v, TRANSFORM_TYPE)?;
261 type_ = Some(parse_str_type(&t)?);
262 }
263
264 TRANSFORM_INDEX => {
265 let index_str = yaml_string(v, TRANSFORM_INDEX)?;
266 index = Some(index_str.try_into()?);
267 }
268
269 TRANSFORM_TAG => {
270 tag = yaml_bool(v, TRANSFORM_TAG)?;
271 }
272
273 TRANSFORM_DEFAULT => {
274 default = match v {
275 yaml_rust::Yaml::Real(r) => Some(r.clone()),
276 yaml_rust::Yaml::Integer(i) => Some(i.to_string()),
277 yaml_rust::Yaml::String(s) => Some(s.clone()),
278 yaml_rust::Yaml::Boolean(b) => Some(b.to_string()),
279 yaml_rust::Yaml::Array(_)
280 | yaml_rust::Yaml::Hash(_)
281 | yaml_rust::Yaml::Alias(_)
282 | yaml_rust::Yaml::Null
283 | yaml_rust::Yaml::BadValue => None,
284 };
285 }
286
287 TRANSFORM_ON_FAILURE => {
288 let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
289 on_failure = Some(on_failure_str.parse()?);
290 }
291
292 _ => {}
293 }
294 }
295
296 ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
298 let type_ = type_.context(TransformTypeMustBeSetSnafu {
299 fields: format!("{:?}", fields),
300 })?;
301
302 let final_default = if let Some(default_value) = default {
303 let target = parse_str_value(&type_, &default_value)?;
304 on_failure = Some(OnFailure::Default);
305 Some(target)
306 } else {
307 None
308 };
309
310 let builder = Transform {
311 fields,
312 type_,
313 default: final_default,
314 index,
315 on_failure,
316 tag,
317 };
318
319 Ok(builder)
320 }
321}