Skip to main content

pipeline/etl/
transform.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod index;
16pub mod transformer;
17
18use std::collections::HashMap;
19
20use api::v1::ColumnDataType;
21use api::v1::value::ValueData;
22use chrono::Utc;
23use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
24use snafu::{OptionExt, ResultExt, ensure};
25use sql::parsers::utils::{
26    validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
27};
28
29use crate::error::{
30    Error, FieldMustBeTypeSnafu, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
31    TransformFieldMustBeSetSnafu, TransformIndexOptionMustBeScalarSnafu, TransformIndexOptionSnafu,
32    TransformIndexOptionUnsupportedSnafu, TransformIndexOptionsUnsupportedSnafu,
33    TransformIndexTypeMismatchSnafu, TransformIndexTypeMustBeSetSnafu,
34    TransformIndexUnsupportedFieldSnafu, TransformOnFailureInvalidValueSnafu,
35    TransformTypeMustBeSetSnafu, UnsupportedTypeInPipelineSnafu,
36};
37use crate::etl::field::Fields;
38use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
39use crate::etl::transform::index::Index;
40use crate::etl::value::{parse_str_type, parse_str_value};
41
42const TRANSFORM_FIELD: &str = "field";
43const TRANSFORM_FIELDS: &str = "fields";
44const TRANSFORM_TYPE: &str = "type";
45const TRANSFORM_INDEX: &str = "index";
46const TRANSFORM_INDEX_TYPE_FIELD: &str = "index.type";
47const TRANSFORM_INDEX_OPTIONS: &str = "options";
48const TRANSFORM_INDEX_OPTIONS_FIELD: &str = "index.options";
49const TRANSFORM_TAG: &str = "tag";
50const TRANSFORM_DEFAULT: &str = "default";
51const TRANSFORM_ON_FAILURE: &str = "on_failure";
52
53pub use transformer::greptime::GreptimeTransformer;
54
55/// On Failure behavior when transform fails
56#[derive(Debug, Clone, Default, Copy)]
57pub enum OnFailure {
58    // Return None if transform fails
59    #[default]
60    Ignore,
61    // Return default value of the field if transform fails
62    // Default value depends on the type of the field, or explicitly set by user
63    Default,
64}
65
66impl std::str::FromStr for OnFailure {
67    type Err = Error;
68
69    fn from_str(s: &str) -> Result<Self> {
70        match s {
71            "ignore" => Ok(OnFailure::Ignore),
72            "default" => Ok(OnFailure::Default),
73            _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
74        }
75    }
76}
77
78#[derive(Debug, Default, Clone)]
79pub struct Transforms {
80    pub(crate) transforms: Vec<Transform>,
81}
82
83impl Transforms {
84    pub fn transforms(&self) -> &Vec<Transform> {
85        &self.transforms
86    }
87}
88
89impl std::ops::Deref for Transforms {
90    type Target = Vec<Transform>;
91
92    fn deref(&self) -> &Self::Target {
93        &self.transforms
94    }
95}
96
97impl std::ops::DerefMut for Transforms {
98    fn deref_mut(&mut self) -> &mut Self::Target {
99        &mut self.transforms
100    }
101}
102
103impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
104    type Error = Error;
105
106    fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
107        let mut transforms = Vec::with_capacity(32);
108        let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
109        let mut all_required_keys = Vec::with_capacity(32);
110
111        for doc in docs {
112            let transform_builder: Transform = doc
113                .as_hash()
114                .context(TransformElementMustBeMapSnafu)?
115                .try_into()?;
116            let mut transform_output_keys = transform_builder
117                .fields
118                .iter()
119                .map(|f| f.target_or_input_field().to_string())
120                .collect();
121            all_output_keys.append(&mut transform_output_keys);
122
123            let mut transform_required_keys = transform_builder
124                .fields
125                .iter()
126                .map(|f| f.input_field().to_string())
127                .collect();
128            all_required_keys.append(&mut transform_required_keys);
129
130            transforms.push(transform_builder);
131        }
132
133        all_required_keys.sort();
134
135        Ok(Transforms { transforms })
136    }
137}
138
139/// only field is required
140#[derive(Debug, Clone)]
141pub struct Transform {
142    pub fields: Fields,
143    pub type_: ColumnDataType,
144    pub default: Option<ValueData>,
145    pub index: Option<Index>,
146    pub index_options: Option<TransformIndexOptions>,
147    pub tag: bool,
148    pub on_failure: Option<OnFailure>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum TransformIndexOptions {
153    Fulltext(FulltextOptions),
154    Skipping(SkippingIndexOptions),
155}
156
157impl TransformIndexOptions {
158    pub(crate) fn index(&self) -> Index {
159        match self {
160            TransformIndexOptions::Fulltext(_) => Index::Fulltext,
161            TransformIndexOptions::Skipping(_) => Index::Skipping,
162        }
163    }
164
165    #[cfg(test)]
166    pub(crate) fn as_fulltext(&self) -> Option<&FulltextOptions> {
167        match self {
168            TransformIndexOptions::Fulltext(options) => Some(options),
169            TransformIndexOptions::Skipping(_) => None,
170        }
171    }
172
173    #[cfg(test)]
174    pub(crate) fn as_skipping(&self) -> Option<&SkippingIndexOptions> {
175        match self {
176            TransformIndexOptions::Skipping(options) => Some(options),
177            TransformIndexOptions::Fulltext(_) => None,
178        }
179    }
180}
181
182// valid types
183// ColumnDataType::Int8
184// ColumnDataType::Int16
185// ColumnDataType::Int32
186// ColumnDataType::Int64
187// ColumnDataType::Uint8
188// ColumnDataType::Uint16
189// ColumnDataType::Uint32
190// ColumnDataType::Uint64
191// ColumnDataType::Float32
192// ColumnDataType::Float64
193// ColumnDataType::Boolean
194// ColumnDataType::String
195// ColumnDataType::TimestampNanosecond
196// ColumnDataType::TimestampMicrosecond
197// ColumnDataType::TimestampMillisecond
198// ColumnDataType::TimestampSecond
199// ColumnDataType::Binary
200
201impl Transform {
202    pub(crate) fn get_default(&self) -> Option<&ValueData> {
203        self.default.as_ref()
204    }
205
206    pub(crate) fn get_type_matched_default_val(&self) -> Result<ValueData> {
207        get_default_for_type(&self.type_)
208    }
209
210    pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<ValueData> {
211        if is_timestamp_type(&self.type_) && self.index.is_some_and(|i| i == Index::Time) {
212            let now = Utc::now();
213            match self.type_ {
214                ColumnDataType::TimestampSecond => {
215                    return Some(ValueData::TimestampSecondValue(now.timestamp()));
216                }
217                ColumnDataType::TimestampMillisecond => {
218                    return Some(ValueData::TimestampMillisecondValue(now.timestamp_millis()));
219                }
220                ColumnDataType::TimestampMicrosecond => {
221                    return Some(ValueData::TimestampMicrosecondValue(now.timestamp_micros()));
222                }
223                ColumnDataType::TimestampNanosecond => {
224                    return Some(ValueData::TimestampNanosecondValue(
225                        now.timestamp_nanos_opt()?,
226                    ));
227                }
228                _ => {}
229            }
230        }
231        None
232    }
233
234    pub(crate) fn is_timeindex(&self) -> bool {
235        self.index.is_some_and(|i| i == Index::Time)
236    }
237}
238
239fn is_timestamp_type(ty: &ColumnDataType) -> bool {
240    matches!(
241        ty,
242        ColumnDataType::TimestampSecond
243            | ColumnDataType::TimestampMillisecond
244            | ColumnDataType::TimestampMicrosecond
245            | ColumnDataType::TimestampNanosecond
246    )
247}
248
249fn get_default_for_type(ty: &ColumnDataType) -> Result<ValueData> {
250    let v = match ty {
251        ColumnDataType::Boolean => ValueData::BoolValue(false),
252        ColumnDataType::Int8 => ValueData::I8Value(0),
253        ColumnDataType::Int16 => ValueData::I16Value(0),
254        ColumnDataType::Int32 => ValueData::I32Value(0),
255        ColumnDataType::Int64 => ValueData::I64Value(0),
256        ColumnDataType::Uint8 => ValueData::U8Value(0),
257        ColumnDataType::Uint16 => ValueData::U16Value(0),
258        ColumnDataType::Uint32 => ValueData::U32Value(0),
259        ColumnDataType::Uint64 => ValueData::U64Value(0),
260        ColumnDataType::Float32 => ValueData::F32Value(0.0),
261        ColumnDataType::Float64 => ValueData::F64Value(0.0),
262        ColumnDataType::Binary => ValueData::BinaryValue(jsonb::Value::Null.to_vec()),
263        ColumnDataType::String => ValueData::StringValue(String::new()),
264
265        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(0),
266        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(0),
267        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(0),
268        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(0),
269
270        _ => UnsupportedTypeInPipelineSnafu {
271            ty: ty.as_str_name(),
272        }
273        .fail()?,
274    };
275    Ok(v)
276}
277
278fn parse_transform_index(
279    value: &yaml_rust::Yaml,
280) -> Result<(Index, Option<HashMap<String, String>>)> {
281    match value {
282        yaml_rust::Yaml::String(_) => {
283            let index_str = yaml_string(value, TRANSFORM_INDEX)?;
284            Ok((index_str.try_into()?, None))
285        }
286        yaml_rust::Yaml::Hash(hash) => {
287            let mut index = None;
288            let mut index_options = None;
289
290            for (k, v) in hash {
291                let key = k
292                    .as_str()
293                    .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
294                match key {
295                    TRANSFORM_TYPE => {
296                        let index_str = yaml_string(v, TRANSFORM_INDEX_TYPE_FIELD)?;
297                        index = Some(index_str.try_into()?);
298                    }
299                    TRANSFORM_INDEX_OPTIONS => {
300                        index_options = Some(parse_transform_index_options(v)?);
301                    }
302                    _ => {
303                        return TransformIndexUnsupportedFieldSnafu {
304                            field: key.to_string(),
305                        }
306                        .fail();
307                    }
308                }
309            }
310
311            Ok((
312                index.context(TransformIndexTypeMustBeSetSnafu)?,
313                index_options,
314            ))
315        }
316        _ => FieldMustBeTypeSnafu {
317            field: TRANSFORM_INDEX,
318            ty: "string or map",
319        }
320        .fail(),
321    }
322}
323
324fn parse_transform_index_options(value: &yaml_rust::Yaml) -> Result<HashMap<String, String>> {
325    let hash = value.as_hash().context(FieldMustBeTypeSnafu {
326        field: TRANSFORM_INDEX_OPTIONS_FIELD,
327        ty: "map",
328    })?;
329    let mut options = HashMap::with_capacity(hash.len());
330
331    for (k, v) in hash {
332        let key = k
333            .as_str()
334            .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
335
336        let field = format!("{TRANSFORM_INDEX_OPTIONS_FIELD}.{key}");
337        let value = match v {
338            yaml_rust::Yaml::String(v) => v.clone(),
339            yaml_rust::Yaml::Boolean(v) => v.to_string(),
340            yaml_rust::Yaml::Integer(v) => v.to_string(),
341            yaml_rust::Yaml::Real(v) => v.clone(),
342            _ => {
343                return TransformIndexOptionMustBeScalarSnafu { field }.fail();
344            }
345        };
346        options.insert(key.to_string(), value);
347    }
348
349    Ok(options)
350}
351
352fn lower_typed_transform_index_options<T>(
353    index: Index,
354    index_options: Option<HashMap<String, String>>,
355    validate: fn(&str) -> bool,
356    wrap: fn(T) -> TransformIndexOptions,
357) -> Result<Option<TransformIndexOptions>>
358where
359    T: TryFrom<HashMap<String, String>, Error = datatypes::error::Error>,
360{
361    index_options
362        .map(|opts| {
363            for key in opts.keys() {
364                ensure!(
365                    validate(key),
366                    TransformIndexOptionUnsupportedSnafu {
367                        index: index.to_string(),
368                        key: key.clone(),
369                    }
370                );
371            }
372
373            let options = opts.try_into().context(TransformIndexOptionSnafu {
374                index: index.to_string(),
375            })?;
376
377            Ok(wrap(options))
378        })
379        .transpose()
380}
381
382fn lower_transform_index_options(
383    index: Index,
384    column_type: &ColumnDataType,
385    index_options: Option<HashMap<String, String>>,
386) -> Result<Option<TransformIndexOptions>> {
387    match index {
388        Index::Fulltext => {
389            ensure!(
390                *column_type == ColumnDataType::String,
391                TransformIndexTypeMismatchSnafu {
392                    index: index.to_string(),
393                    expected: ColumnDataType::String.as_str_name().to_string(),
394                    actual: column_type.as_str_name().to_string(),
395                }
396            );
397
398            lower_typed_transform_index_options(
399                index,
400                index_options,
401                validate_column_fulltext_create_option,
402                TransformIndexOptions::Fulltext,
403            )
404        }
405        Index::Skipping => lower_typed_transform_index_options(
406            index,
407            index_options,
408            validate_column_skipping_index_create_option,
409            TransformIndexOptions::Skipping,
410        ),
411        Index::Inverted | Index::Time | Index::Tag => {
412            ensure!(
413                index_options.is_none(),
414                TransformIndexOptionsUnsupportedSnafu {
415                    index: index.to_string(),
416                }
417            );
418            Ok(None)
419        }
420    }
421}
422impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
423    type Error = Error;
424
425    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
426        let mut fields = Fields::default();
427        let mut default = None;
428        let mut index_value = None;
429        let mut tag = false;
430        let mut on_failure = None;
431
432        let mut type_ = None;
433
434        for (k, v) in hash {
435            let key = k
436                .as_str()
437                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
438            match key {
439                TRANSFORM_FIELD => {
440                    fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
441                }
442
443                TRANSFORM_FIELDS => {
444                    fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
445                }
446
447                TRANSFORM_TYPE => {
448                    let t = yaml_string(v, TRANSFORM_TYPE)?;
449                    type_ = Some(parse_str_type(&t)?);
450                }
451
452                TRANSFORM_INDEX => {
453                    index_value = Some(v);
454                }
455
456                TRANSFORM_TAG => {
457                    tag = yaml_bool(v, TRANSFORM_TAG)?;
458                }
459
460                TRANSFORM_DEFAULT => {
461                    default = match v {
462                        yaml_rust::Yaml::Real(r) => Some(r.clone()),
463                        yaml_rust::Yaml::Integer(i) => Some(i.to_string()),
464                        yaml_rust::Yaml::String(s) => Some(s.clone()),
465                        yaml_rust::Yaml::Boolean(b) => Some(b.to_string()),
466                        yaml_rust::Yaml::Array(_)
467                        | yaml_rust::Yaml::Hash(_)
468                        | yaml_rust::Yaml::Alias(_)
469                        | yaml_rust::Yaml::Null
470                        | yaml_rust::Yaml::BadValue => None,
471                    };
472                }
473
474                TRANSFORM_ON_FAILURE => {
475                    let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
476                    on_failure = Some(on_failure_str.parse()?);
477                }
478
479                _ => {}
480            }
481        }
482
483        // ensure fields and type
484        ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
485        let type_ = type_.context(TransformTypeMustBeSetSnafu {
486            fields: format!("{:?}", fields),
487        })?;
488
489        let (index, index_options) = match index_value {
490            Some(value) => {
491                let (index, raw_index_options) = parse_transform_index(value)?;
492                let index_options =
493                    lower_transform_index_options(index, &type_, raw_index_options)?;
494                (Some(index), index_options)
495            }
496            None => (None, None),
497        };
498
499        let final_default = if let Some(default_value) = default {
500            let target = parse_str_value(&type_, &default_value)?;
501            on_failure = Some(OnFailure::Default);
502            Some(target)
503        } else {
504            None
505        };
506
507        let builder = Transform {
508            fields,
509            type_,
510            default: final_default,
511            index,
512            index_options,
513            on_failure,
514            tag,
515        };
516
517        Ok(builder)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use yaml_rust::YamlLoader;
524
525    use super::*;
526
527    fn parse_transform(yaml: &str) -> Result<Transform> {
528        let docs = YamlLoader::load_from_str(yaml).unwrap();
529        docs[0].as_hash().unwrap().try_into()
530    }
531
532    #[test]
533    fn test_transform_parses_legacy_string_index() {
534        let transform = parse_transform(
535            r#"
536field: message
537type: string
538index: fulltext
539"#,
540        )
541        .unwrap();
542
543        assert_eq!(transform.index, Some(Index::Fulltext));
544        assert!(transform.index_options.is_none());
545    }
546
547    #[test]
548    fn test_transform_parses_index_object_without_options() {
549        let transform = parse_transform(
550            r#"
551field: message
552type: string
553index:
554  type: inverted
555"#,
556        )
557        .unwrap();
558
559        assert_eq!(transform.index, Some(Index::Inverted));
560        assert!(transform.index_options.is_none());
561    }
562
563    #[test]
564    fn test_transform_parses_index_object_with_scalar_options() {
565        let transform = parse_transform(
566            r#"
567field: message
568type: string
569index:
570  type: fulltext
571  options:
572    analyzer: English
573    case_sensitive: false
574    granularity: 2048
575    false_positive_rate: 0.02
576"#,
577        )
578        .unwrap();
579
580        assert_eq!(transform.index, Some(Index::Fulltext));
581        let options = transform.index_options.as_ref().unwrap();
582        let fulltext = options.as_fulltext().unwrap();
583        assert!(fulltext.enable);
584        assert_eq!(fulltext.analyzer.to_string(), "English");
585        assert!(!fulltext.case_sensitive);
586        assert_eq!(fulltext.granularity, 2048);
587        assert_eq!(fulltext.false_positive_rate(), 0.02);
588    }
589
590    #[test]
591    fn test_transform_rejects_invalid_index_options_type() {
592        let result = parse_transform(
593            r#"
594field: message
595type: string
596index:
597  type: fulltext
598  options: invalid
599"#,
600        );
601
602        assert!(result.is_err());
603    }
604
605    #[test]
606    fn test_transform_rejects_non_scalar_index_option_value() {
607        let result = parse_transform(
608            r#"
609field: message
610type: string
611index:
612  type: fulltext
613  options:
614    analyzer:
615      kind: English
616"#,
617        );
618
619        assert!(result.is_err());
620    }
621
622    #[test]
623    fn test_transform_rejects_unknown_index_field() {
624        let result = parse_transform(
625            r#"
626field: message
627type: string
628index:
629  type: fulltext
630  config: {}
631"#,
632        );
633
634        assert!(result.is_err());
635    }
636
637    #[test]
638    fn test_transform_rejects_unsupported_fulltext_option_key() {
639        let result = parse_transform(
640            r#"
641field: message
642type: string
643index:
644  type: fulltext
645  options:
646    tokenizer: english
647"#,
648        );
649
650        assert!(result.is_err());
651    }
652
653    #[test]
654    fn test_transform_rejects_options_for_inverted_index() {
655        let result = parse_transform(
656            r#"
657field: message
658type: string
659index:
660  type: inverted
661  options:
662    backend: bloom
663"#,
664        );
665
666        assert!(result.is_err());
667    }
668
669    #[test]
670    fn test_transform_rejects_empty_options_for_unsupported_indexes() {
671        for index in ["inverted", "time", "tag"] {
672            let yaml = format!(
673                r#"
674field: message
675type: string
676index:
677  type: {index}
678  options: {{}}
679"#
680            );
681
682            let result = parse_transform(&yaml);
683            assert!(
684                result.is_err(),
685                "expected `{index}` to reject empty options"
686            );
687        }
688    }
689
690    #[test]
691    fn test_transform_rejects_fulltext_index_on_non_string_column() {
692        let result = parse_transform(
693            r#"
694field: count
695type: int64
696index: fulltext
697"#,
698        );
699
700        assert!(result.is_err());
701    }
702
703    #[test]
704    fn test_transform_allows_skipping_index_on_numeric_column() {
705        let transform = parse_transform(
706            r#"
707field: count
708type: int64
709index:
710  type: skipping
711  options:
712    granularity: 2048
713    false_positive_rate: 0.02
714    type: BLOOM
715"#,
716        )
717        .unwrap();
718
719        assert_eq!(transform.index, Some(Index::Skipping));
720        let skipping = transform
721            .index_options
722            .as_ref()
723            .unwrap()
724            .as_skipping()
725            .unwrap();
726        assert_eq!(skipping.granularity, 2048);
727        assert_eq!(skipping.false_positive_rate(), 0.02);
728        assert_eq!(skipping.index_type.to_string(), "BLOOM");
729    }
730}