pipeline/etl/
transform.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod index;
16pub mod transformer;
17
18use snafu::OptionExt;
19
20use crate::error::{
21    Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
22    TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
26use crate::etl::transform::index::Index;
27use crate::etl::value::{Timestamp, Value};
28
29const TRANSFORM_FIELD: &str = "field";
30const TRANSFORM_FIELDS: &str = "fields";
31const TRANSFORM_TYPE: &str = "type";
32const TRANSFORM_INDEX: &str = "index";
33const TRANSFORM_TAG: &str = "tag";
34const TRANSFORM_DEFAULT: &str = "default";
35const TRANSFORM_ON_FAILURE: &str = "on_failure";
36
37pub use transformer::greptime::GreptimeTransformer;
38
39/// On Failure behavior when transform fails
40#[derive(Debug, Clone, Default, Copy)]
41pub enum OnFailure {
42    // Return None if transform fails
43    #[default]
44    Ignore,
45    // Return default value of the field if transform fails
46    // Default value depends on the type of the field, or explicitly set by user
47    Default,
48}
49
50impl std::str::FromStr for OnFailure {
51    type Err = Error;
52
53    fn from_str(s: &str) -> Result<Self> {
54        match s {
55            "ignore" => Ok(OnFailure::Ignore),
56            "default" => Ok(OnFailure::Default),
57            _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
58        }
59    }
60}
61
62#[derive(Debug, Default, Clone)]
63pub struct Transforms {
64    pub(crate) transforms: Vec<Transform>,
65}
66
67impl Transforms {
68    pub fn transforms(&self) -> &Vec<Transform> {
69        &self.transforms
70    }
71}
72
73impl std::ops::Deref for Transforms {
74    type Target = Vec<Transform>;
75
76    fn deref(&self) -> &Self::Target {
77        &self.transforms
78    }
79}
80
81impl std::ops::DerefMut for Transforms {
82    fn deref_mut(&mut self) -> &mut Self::Target {
83        &mut self.transforms
84    }
85}
86
87impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
88    type Error = Error;
89
90    fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
91        let mut transforms = Vec::with_capacity(32);
92        let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
93        let mut all_required_keys = Vec::with_capacity(32);
94
95        for doc in docs {
96            let transform_builder: Transform = doc
97                .as_hash()
98                .context(TransformElementMustBeMapSnafu)?
99                .try_into()?;
100            let mut transform_output_keys = transform_builder
101                .fields
102                .iter()
103                .map(|f| f.target_or_input_field().to_string())
104                .collect();
105            all_output_keys.append(&mut transform_output_keys);
106
107            let mut transform_required_keys = transform_builder
108                .fields
109                .iter()
110                .map(|f| f.input_field().to_string())
111                .collect();
112            all_required_keys.append(&mut transform_required_keys);
113
114            transforms.push(transform_builder);
115        }
116
117        all_required_keys.sort();
118
119        Ok(Transforms { transforms })
120    }
121}
122
123/// only field is required
124#[derive(Debug, Clone)]
125pub struct Transform {
126    pub fields: Fields,
127    pub type_: Value,
128    pub default: Option<Value>,
129    pub index: Option<Index>,
130    pub tag: bool,
131    pub on_failure: Option<OnFailure>,
132}
133
134impl Default for Transform {
135    fn default() -> Self {
136        Transform {
137            fields: Fields::default(),
138            type_: Value::Null,
139            default: None,
140            index: None,
141            tag: false,
142            on_failure: None,
143        }
144    }
145}
146
147impl Transform {
148    pub(crate) fn get_default(&self) -> Option<&Value> {
149        self.default.as_ref()
150    }
151
152    pub(crate) fn get_type_matched_default_val(&self) -> &Value {
153        &self.type_
154    }
155
156    pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<Value> {
157        if matches!(self.type_, Value::Timestamp(_)) && self.index.is_some_and(|i| i == Index::Time)
158        {
159            return Some(Value::Timestamp(Timestamp::default()));
160        }
161        None
162    }
163}
164
165impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
166    type Error = Error;
167
168    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
169        let mut fields = Fields::default();
170        let mut type_ = Value::Null;
171        let mut default = None;
172        let mut index = None;
173        let mut tag = false;
174        let mut on_failure = None;
175
176        for (k, v) in hash {
177            let key = k
178                .as_str()
179                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
180            match key {
181                TRANSFORM_FIELD => {
182                    fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
183                }
184
185                TRANSFORM_FIELDS => {
186                    fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
187                }
188
189                TRANSFORM_TYPE => {
190                    let t = yaml_string(v, TRANSFORM_TYPE)?;
191                    type_ = Value::parse_str_type(&t)?;
192                }
193
194                TRANSFORM_INDEX => {
195                    let index_str = yaml_string(v, TRANSFORM_INDEX)?;
196                    index = Some(index_str.try_into()?);
197                }
198
199                TRANSFORM_TAG => {
200                    tag = yaml_bool(v, TRANSFORM_TAG)?;
201                }
202
203                TRANSFORM_DEFAULT => {
204                    default = Some(Value::try_from(v)?);
205                }
206
207                TRANSFORM_ON_FAILURE => {
208                    let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
209                    on_failure = Some(on_failure_str.parse()?);
210                }
211
212                _ => {}
213            }
214        }
215        let mut final_default = None;
216
217        if let Some(default_value) = default {
218            match (&type_, &default_value) {
219                (Value::Null, _) => {
220                    return TransformTypeMustBeSetSnafu {
221                        fields: format!("{:?}", fields),
222                        default: default_value.to_string(),
223                    }
224                    .fail();
225                }
226                (_, Value::Null) => {} // if default is not set, then it will be regarded as default null
227                (_, _) => {
228                    let target = type_.parse_str_value(default_value.to_str_value().as_str())?;
229                    final_default = Some(target);
230                    on_failure = Some(OnFailure::Default);
231                }
232            }
233        }
234        let builder = Transform {
235            fields,
236            type_,
237            default: final_default,
238            index,
239            on_failure,
240            tag,
241        };
242
243        Ok(builder)
244    }
245}