pipeline/etl/
transform.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod index;
16pub mod transformer;
17
18use api::v1::value::ValueData;
19use api::v1::ColumnDataType;
20use chrono::Utc;
21use snafu::{ensure, OptionExt};
22
23use crate::error::{
24    Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
25    TransformFieldMustBeSetSnafu, TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
26    UnsupportedTypeInPipelineSnafu,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
30use crate::etl::transform::index::Index;
31use crate::etl::value::{parse_str_type, parse_str_value};
32
33const TRANSFORM_FIELD: &str = "field";
34const TRANSFORM_FIELDS: &str = "fields";
35const TRANSFORM_TYPE: &str = "type";
36const TRANSFORM_INDEX: &str = "index";
37const TRANSFORM_TAG: &str = "tag";
38const TRANSFORM_DEFAULT: &str = "default";
39const TRANSFORM_ON_FAILURE: &str = "on_failure";
40
41pub use transformer::greptime::GreptimeTransformer;
42
43/// On Failure behavior when transform fails
44#[derive(Debug, Clone, Default, Copy)]
45pub enum OnFailure {
46    // Return None if transform fails
47    #[default]
48    Ignore,
49    // Return default value of the field if transform fails
50    // Default value depends on the type of the field, or explicitly set by user
51    Default,
52}
53
54impl std::str::FromStr for OnFailure {
55    type Err = Error;
56
57    fn from_str(s: &str) -> Result<Self> {
58        match s {
59            "ignore" => Ok(OnFailure::Ignore),
60            "default" => Ok(OnFailure::Default),
61            _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
62        }
63    }
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct Transforms {
68    pub(crate) transforms: Vec<Transform>,
69}
70
71impl Transforms {
72    pub fn transforms(&self) -> &Vec<Transform> {
73        &self.transforms
74    }
75}
76
77impl std::ops::Deref for Transforms {
78    type Target = Vec<Transform>;
79
80    fn deref(&self) -> &Self::Target {
81        &self.transforms
82    }
83}
84
85impl std::ops::DerefMut for Transforms {
86    fn deref_mut(&mut self) -> &mut Self::Target {
87        &mut self.transforms
88    }
89}
90
91impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
92    type Error = Error;
93
94    fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
95        let mut transforms = Vec::with_capacity(32);
96        let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
97        let mut all_required_keys = Vec::with_capacity(32);
98
99        for doc in docs {
100            let transform_builder: Transform = doc
101                .as_hash()
102                .context(TransformElementMustBeMapSnafu)?
103                .try_into()?;
104            let mut transform_output_keys = transform_builder
105                .fields
106                .iter()
107                .map(|f| f.target_or_input_field().to_string())
108                .collect();
109            all_output_keys.append(&mut transform_output_keys);
110
111            let mut transform_required_keys = transform_builder
112                .fields
113                .iter()
114                .map(|f| f.input_field().to_string())
115                .collect();
116            all_required_keys.append(&mut transform_required_keys);
117
118            transforms.push(transform_builder);
119        }
120
121        all_required_keys.sort();
122
123        Ok(Transforms { transforms })
124    }
125}
126
127/// only field is required
128#[derive(Debug, Clone)]
129pub struct Transform {
130    pub fields: Fields,
131    pub type_: ColumnDataType,
132    pub default: Option<ValueData>,
133    pub index: Option<Index>,
134    pub tag: bool,
135    pub on_failure: Option<OnFailure>,
136}
137
138// valid types
139// ColumnDataType::Int8
140// ColumnDataType::Int16
141// ColumnDataType::Int32
142// ColumnDataType::Int64
143// ColumnDataType::Uint8
144// ColumnDataType::Uint16
145// ColumnDataType::Uint32
146// ColumnDataType::Uint64
147// ColumnDataType::Float32
148// ColumnDataType::Float64
149// ColumnDataType::Boolean
150// ColumnDataType::String
151// ColumnDataType::TimestampNanosecond
152// ColumnDataType::TimestampMicrosecond
153// ColumnDataType::TimestampMillisecond
154// ColumnDataType::TimestampSecond
155// ColumnDataType::Binary
156
157impl Transform {
158    pub(crate) fn get_default(&self) -> Option<&ValueData> {
159        self.default.as_ref()
160    }
161
162    pub(crate) fn get_type_matched_default_val(&self) -> Result<ValueData> {
163        get_default_for_type(&self.type_)
164    }
165
166    pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<ValueData> {
167        if is_timestamp_type(&self.type_) && self.index.is_some_and(|i| i == Index::Time) {
168            let now = Utc::now();
169            match self.type_ {
170                ColumnDataType::TimestampSecond => {
171                    return Some(ValueData::TimestampSecondValue(now.timestamp()));
172                }
173                ColumnDataType::TimestampMillisecond => {
174                    return Some(ValueData::TimestampMillisecondValue(now.timestamp_millis()));
175                }
176                ColumnDataType::TimestampMicrosecond => {
177                    return Some(ValueData::TimestampMicrosecondValue(now.timestamp_micros()));
178                }
179                ColumnDataType::TimestampNanosecond => {
180                    return Some(ValueData::TimestampNanosecondValue(
181                        now.timestamp_nanos_opt()?,
182                    ));
183                }
184                _ => {}
185            }
186        }
187        None
188    }
189
190    pub(crate) fn is_timeindex(&self) -> bool {
191        self.index.is_some_and(|i| i == Index::Time)
192    }
193}
194
195fn is_timestamp_type(ty: &ColumnDataType) -> bool {
196    matches!(
197        ty,
198        ColumnDataType::TimestampSecond
199            | ColumnDataType::TimestampMillisecond
200            | ColumnDataType::TimestampMicrosecond
201            | ColumnDataType::TimestampNanosecond
202    )
203}
204
205fn get_default_for_type(ty: &ColumnDataType) -> Result<ValueData> {
206    let v = match ty {
207        ColumnDataType::Boolean => ValueData::BoolValue(false),
208        ColumnDataType::Int8 => ValueData::I8Value(0),
209        ColumnDataType::Int16 => ValueData::I16Value(0),
210        ColumnDataType::Int32 => ValueData::I32Value(0),
211        ColumnDataType::Int64 => ValueData::I64Value(0),
212        ColumnDataType::Uint8 => ValueData::U8Value(0),
213        ColumnDataType::Uint16 => ValueData::U16Value(0),
214        ColumnDataType::Uint32 => ValueData::U32Value(0),
215        ColumnDataType::Uint64 => ValueData::U64Value(0),
216        ColumnDataType::Float32 => ValueData::F32Value(0.0),
217        ColumnDataType::Float64 => ValueData::F64Value(0.0),
218        ColumnDataType::Binary => ValueData::BinaryValue(jsonb::Value::Null.to_vec()),
219        ColumnDataType::String => ValueData::StringValue(String::new()),
220
221        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(0),
222        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(0),
223        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(0),
224        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(0),
225
226        _ => UnsupportedTypeInPipelineSnafu {
227            ty: ty.as_str_name(),
228        }
229        .fail()?,
230    };
231    Ok(v)
232}
233
234impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
235    type Error = Error;
236
237    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
238        let mut fields = Fields::default();
239        let mut default = None;
240        let mut index = None;
241        let mut tag = false;
242        let mut on_failure = None;
243
244        let mut type_ = None;
245
246        for (k, v) in hash {
247            let key = k
248                .as_str()
249                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
250            match key {
251                TRANSFORM_FIELD => {
252                    fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
253                }
254
255                TRANSFORM_FIELDS => {
256                    fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
257                }
258
259                TRANSFORM_TYPE => {
260                    let t = yaml_string(v, TRANSFORM_TYPE)?;
261                    type_ = Some(parse_str_type(&t)?);
262                }
263
264                TRANSFORM_INDEX => {
265                    let index_str = yaml_string(v, TRANSFORM_INDEX)?;
266                    index = Some(index_str.try_into()?);
267                }
268
269                TRANSFORM_TAG => {
270                    tag = yaml_bool(v, TRANSFORM_TAG)?;
271                }
272
273                TRANSFORM_DEFAULT => {
274                    default = match v {
275                        yaml_rust::Yaml::Real(r) => Some(r.clone()),
276                        yaml_rust::Yaml::Integer(i) => Some(i.to_string()),
277                        yaml_rust::Yaml::String(s) => Some(s.clone()),
278                        yaml_rust::Yaml::Boolean(b) => Some(b.to_string()),
279                        yaml_rust::Yaml::Array(_)
280                        | yaml_rust::Yaml::Hash(_)
281                        | yaml_rust::Yaml::Alias(_)
282                        | yaml_rust::Yaml::Null
283                        | yaml_rust::Yaml::BadValue => None,
284                    };
285                }
286
287                TRANSFORM_ON_FAILURE => {
288                    let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
289                    on_failure = Some(on_failure_str.parse()?);
290                }
291
292                _ => {}
293            }
294        }
295
296        // ensure fields and type
297        ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
298        let type_ = type_.context(TransformTypeMustBeSetSnafu {
299            fields: format!("{:?}", fields),
300        })?;
301
302        let final_default = if let Some(default_value) = default {
303            let target = parse_str_value(&type_, &default_value)?;
304            on_failure = Some(OnFailure::Default);
305            Some(target)
306        } else {
307            None
308        };
309
310        let builder = Transform {
311            fields,
312            type_,
313            default: final_default,
314            index,
315            on_failure,
316            tag,
317        };
318
319        Ok(builder)
320    }
321}