Skip to main content

pipeline/etl/transform/transformer/greptime/
coerce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::{OptionExt, ResultExt, ensure};
22use vrl::value::Value as VrlValue;
23
24use crate::error::{
25    CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
26    CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, ColumnOptionsSnafu,
27    InvalidTimestampSnafu, Result, TransformIndexStateMismatchSnafu,
28    UnsupportedTypeInPipelineSnafu, VrlRegexValueSnafu,
29};
30use crate::etl::transform::index::Index;
31use crate::etl::transform::transformer::greptime::vrl_value_to_jsonb_value;
32use crate::etl::transform::{OnFailure, Transform, TransformIndexOptions};
33
34pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
35    let mut columns = Vec::new();
36
37    for field in transform.fields.iter() {
38        let column_name = field.target_or_input_field().to_string();
39
40        let ext = if matches!(transform.type_, ColumnDataType::Binary) {
41            Some(ColumnDataTypeExtension {
42                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
43            })
44        } else {
45            None
46        };
47
48        let semantic_type = coerce_semantic_type(transform) as i32;
49
50        let column = ColumnSchema {
51            column_name,
52            datatype: transform.type_ as i32,
53            semantic_type,
54            datatype_extension: ext,
55            options: coerce_options(transform)?,
56        };
57        columns.push(column);
58    }
59
60    Ok(columns)
61}
62
63fn coerce_semantic_type(transform: &Transform) -> SemanticType {
64    if transform.tag {
65        return SemanticType::Tag;
66    }
67
68    match transform.index {
69        Some(Index::Tag) => SemanticType::Tag,
70        Some(Index::Time) => SemanticType::Timestamp,
71        Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
72            SemanticType::Field
73        }
74    }
75}
76
77fn transform_index_label(index: Option<Index>) -> String {
78    index
79        .map(|index| index.to_string())
80        .unwrap_or_else(|| "none".to_string())
81}
82
83fn validate_transform_index_state(transform: &Transform) -> Result<()> {
84    let Some(index_options) = transform.index_options.as_ref() else {
85        return Ok(());
86    };
87
88    let options_index = index_options.index();
89    let index = transform.index;
90    ensure!(
91        index == Some(options_index),
92        TransformIndexStateMismatchSnafu {
93            index: transform_index_label(index),
94            options: options_index.to_string(),
95        }
96    );
97
98    Ok(())
99}
100
101fn build_fulltext_index_options(transform: &Transform) -> Result<FulltextOptions> {
102    match transform.index_options.as_ref() {
103        None => Ok(FulltextOptions {
104            enable: true,
105            ..Default::default()
106        }),
107        Some(TransformIndexOptions::Fulltext(options)) => Ok(options.clone()),
108        Some(options) => TransformIndexStateMismatchSnafu {
109            index: Index::Fulltext.to_string(),
110            options: options.index().to_string(),
111        }
112        .fail(),
113    }
114}
115
116fn build_skipping_index_options(transform: &Transform) -> Result<SkippingIndexOptions> {
117    match transform.index_options.as_ref() {
118        None => Ok(SkippingIndexOptions::default()),
119        Some(TransformIndexOptions::Skipping(options)) => Ok(options.clone()),
120        Some(options) => TransformIndexStateMismatchSnafu {
121            index: Index::Skipping.to_string(),
122            options: options.index().to_string(),
123        }
124        .fail(),
125    }
126}
127
128fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
129    validate_transform_index_state(transform)?;
130
131    match transform.index {
132        Some(Index::Fulltext) => {
133            let options = build_fulltext_index_options(transform)?;
134            options_from_fulltext(&options).context(ColumnOptionsSnafu)
135        }
136        Some(Index::Skipping) => {
137            let options = build_skipping_index_options(transform)?;
138            options_from_skipping(&options).context(ColumnOptionsSnafu)
139        }
140        Some(Index::Inverted) => Ok(Some(options_from_inverted())),
141        _ => Ok(None),
142    }
143}
144
145pub(crate) fn coerce_value(val: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
146    match val {
147        VrlValue::Null => Ok(None),
148        VrlValue::Integer(n) => coerce_i64_value(*n, transform),
149        VrlValue::Float(n) => coerce_f64_value(n.into_inner(), transform),
150        VrlValue::Boolean(b) => coerce_bool_value(*b, transform),
151        VrlValue::Bytes(b) => coerce_string_value(String::from_utf8_lossy(b).as_ref(), transform),
152        VrlValue::Timestamp(ts) => match transform.type_ {
153            ColumnDataType::TimestampNanosecond => Ok(Some(ValueData::TimestampNanosecondValue(
154                ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
155                    input: ts.to_rfc3339(),
156                })?,
157            ))),
158            ColumnDataType::TimestampMicrosecond => Ok(Some(ValueData::TimestampMicrosecondValue(
159                ts.timestamp_micros(),
160            ))),
161            ColumnDataType::TimestampMillisecond => Ok(Some(ValueData::TimestampMillisecondValue(
162                ts.timestamp_millis(),
163            ))),
164            ColumnDataType::TimestampSecond => {
165                Ok(Some(ValueData::TimestampSecondValue(ts.timestamp())))
166            }
167            _ => CoerceIncompatibleTypesSnafu {
168                msg: "Timestamp can only be coerced to another type",
169            }
170            .fail(),
171        },
172        VrlValue::Array(_) | VrlValue::Object(_) => coerce_json_value(val, transform),
173        VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
174    }
175}
176
177fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
178    let val = match transform.type_ {
179        ColumnDataType::Int8 => ValueData::I8Value(b as i32),
180        ColumnDataType::Int16 => ValueData::I16Value(b as i32),
181        ColumnDataType::Int32 => ValueData::I32Value(b as i32),
182        ColumnDataType::Int64 => ValueData::I64Value(b as i64),
183
184        ColumnDataType::Uint8 => ValueData::U8Value(b as u32),
185        ColumnDataType::Uint16 => ValueData::U16Value(b as u32),
186        ColumnDataType::Uint32 => ValueData::U32Value(b as u32),
187        ColumnDataType::Uint64 => ValueData::U64Value(b as u64),
188
189        ColumnDataType::Float32 => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
190        ColumnDataType::Float64 => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
191
192        ColumnDataType::Boolean => ValueData::BoolValue(b),
193        ColumnDataType::String => ValueData::StringValue(b.to_string()),
194
195        ColumnDataType::TimestampNanosecond
196        | ColumnDataType::TimestampMicrosecond
197        | ColumnDataType::TimestampMillisecond
198        | ColumnDataType::TimestampSecond => match transform.on_failure {
199            Some(OnFailure::Ignore) => return Ok(None),
200            Some(OnFailure::Default) => {
201                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
202            }
203            None => {
204                return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
205            }
206        },
207
208        ColumnDataType::Binary => {
209            return CoerceJsonTypeToSnafu {
210                ty: transform.type_.as_str_name(),
211            }
212            .fail();
213        }
214
215        _ => {
216            return UnsupportedTypeInPipelineSnafu {
217                ty: transform.type_.as_str_name(),
218            }
219            .fail();
220        }
221    };
222
223    Ok(Some(val))
224}
225
226fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
227    let val = match &transform.type_ {
228        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
229        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
230        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
231        ColumnDataType::Int64 => ValueData::I64Value(n),
232
233        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
234        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
235        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
236        ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
237
238        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
239        ColumnDataType::Float64 => ValueData::F64Value(n as f64),
240
241        ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
242        ColumnDataType::String => ValueData::StringValue(n.to_string()),
243
244        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n),
245        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n),
246        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n),
247        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n),
248
249        ColumnDataType::Binary => {
250            return CoerceJsonTypeToSnafu {
251                ty: transform.type_.as_str_name(),
252            }
253            .fail();
254        }
255
256        _ => return Ok(None),
257    };
258
259    Ok(Some(val))
260}
261
262fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
263    let val = match &transform.type_ {
264        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
265        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
266        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
267        ColumnDataType::Int64 => ValueData::I64Value(n as i64),
268
269        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
270        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
271        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
272        ColumnDataType::Uint64 => ValueData::U64Value(n),
273
274        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
275        ColumnDataType::Float64 => ValueData::F64Value(n as f64),
276
277        ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
278        ColumnDataType::String => ValueData::StringValue(n.to_string()),
279
280        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n as i64),
281        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n as i64),
282        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n as i64),
283        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n as i64),
284
285        ColumnDataType::Binary => {
286            return CoerceJsonTypeToSnafu {
287                ty: transform.type_.as_str_name(),
288            }
289            .fail();
290        }
291
292        _ => return Ok(None),
293    };
294
295    Ok(Some(val))
296}
297
298fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
299    let val = match transform.type_ {
300        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
301        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
302        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
303        ColumnDataType::Int64 => ValueData::I64Value(n as i64),
304
305        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
306        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
307        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
308        ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
309
310        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
311        ColumnDataType::Float64 => ValueData::F64Value(n),
312
313        ColumnDataType::Boolean => ValueData::BoolValue(n != 0.0),
314        ColumnDataType::String => ValueData::StringValue(n.to_string()),
315
316        ColumnDataType::TimestampNanosecond
317        | ColumnDataType::TimestampMicrosecond
318        | ColumnDataType::TimestampMillisecond
319        | ColumnDataType::TimestampSecond => match transform.on_failure {
320            Some(OnFailure::Ignore) => return Ok(None),
321            Some(OnFailure::Default) => {
322                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
323            }
324            None => {
325                return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
326            }
327        },
328
329        ColumnDataType::Binary => {
330            return CoerceJsonTypeToSnafu {
331                ty: transform.type_.as_str_name(),
332            }
333            .fail();
334        }
335
336        _ => return Ok(None),
337    };
338
339    Ok(Some(val))
340}
341
342macro_rules! coerce_string_value {
343    ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
344        match $s.parse::<$type>() {
345            Ok(v) => Ok(Some(ValueData::$parse(v))),
346            Err(_) => match $transform.on_failure {
347                Some(OnFailure::Ignore) => Ok(None),
348                Some(OnFailure::Default) => match $transform.get_default() {
349                    Some(default) => Ok(Some(default.clone())),
350                    None => $transform.get_type_matched_default_val().map(Some),
351                },
352                None => CoerceStringToTypeSnafu {
353                    s: $s,
354                    ty: $transform.type_.as_str_name(),
355                }
356                .fail(),
357            },
358        }
359    };
360}
361
362fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>> {
363    match transform.type_ {
364        ColumnDataType::Int8 => {
365            coerce_string_value!(s, transform, i32, I8Value)
366        }
367        ColumnDataType::Int16 => {
368            coerce_string_value!(s, transform, i32, I16Value)
369        }
370        ColumnDataType::Int32 => {
371            coerce_string_value!(s, transform, i32, I32Value)
372        }
373        ColumnDataType::Int64 => {
374            coerce_string_value!(s, transform, i64, I64Value)
375        }
376
377        ColumnDataType::Uint8 => {
378            coerce_string_value!(s, transform, u32, U8Value)
379        }
380        ColumnDataType::Uint16 => {
381            coerce_string_value!(s, transform, u32, U16Value)
382        }
383        ColumnDataType::Uint32 => {
384            coerce_string_value!(s, transform, u32, U32Value)
385        }
386        ColumnDataType::Uint64 => {
387            coerce_string_value!(s, transform, u64, U64Value)
388        }
389
390        ColumnDataType::Float32 => {
391            coerce_string_value!(s, transform, f32, F32Value)
392        }
393        ColumnDataType::Float64 => {
394            coerce_string_value!(s, transform, f64, F64Value)
395        }
396
397        ColumnDataType::Boolean => {
398            coerce_string_value!(s, transform, bool, BoolValue)
399        }
400
401        ColumnDataType::String => Ok(Some(ValueData::StringValue(s.to_string()))),
402
403        ColumnDataType::TimestampNanosecond
404        | ColumnDataType::TimestampMicrosecond
405        | ColumnDataType::TimestampMillisecond
406        | ColumnDataType::TimestampSecond => match transform.on_failure {
407            Some(OnFailure::Ignore) => Ok(None),
408            Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
409            None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
410        },
411
412        ColumnDataType::Binary => CoerceStringToTypeSnafu {
413            s,
414            ty: transform.type_.as_str_name(),
415        }
416        .fail(),
417
418        _ => Ok(None),
419    }
420}
421
422fn coerce_json_value(v: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
423    match &transform.type_ {
424        ColumnDataType::Binary => (),
425        t => {
426            return CoerceTypeToJsonSnafu {
427                ty: t.as_str_name(),
428            }
429            .fail();
430        }
431    }
432    let data: jsonb::Value = vrl_value_to_jsonb_value(v);
433    Ok(Some(ValueData::BinaryValue(data.to_vec())))
434}
435
436#[cfg(test)]
437mod tests {
438
439    use datatypes::schema::{FulltextAnalyzer, FulltextBackend, SkippingIndexType};
440    use vrl::prelude::Bytes;
441
442    use super::*;
443    use crate::etl::field::Fields;
444
445    #[test]
446    fn test_coerce_string_without_on_failure() {
447        let transform = Transform {
448            fields: Fields::default(),
449            type_: ColumnDataType::Int32,
450            default: None,
451            index: None,
452            index_options: None,
453            on_failure: None,
454            tag: false,
455        };
456
457        // valid string
458        {
459            let val = VrlValue::Integer(123);
460            let result = coerce_value(&val, &transform).unwrap();
461            assert_eq!(result, Some(ValueData::I32Value(123)));
462        }
463
464        // invalid string
465        {
466            let val = VrlValue::Bytes(Bytes::from("hello"));
467            let result = coerce_value(&val, &transform);
468            assert!(result.is_err());
469        }
470    }
471
472    #[test]
473    fn test_coerce_string_with_on_failure_ignore() {
474        let transform = Transform {
475            fields: Fields::default(),
476            type_: ColumnDataType::Int32,
477            default: None,
478            index: None,
479            index_options: None,
480            on_failure: Some(OnFailure::Ignore),
481            tag: false,
482        };
483
484        let val = VrlValue::Bytes(Bytes::from("hello"));
485        let result = coerce_value(&val, &transform).unwrap();
486        assert_eq!(result, None);
487    }
488
489    #[test]
490    fn test_coerce_string_with_on_failure_default() {
491        let mut transform = Transform {
492            fields: Fields::default(),
493            type_: ColumnDataType::Int32,
494            default: None,
495            index: None,
496            index_options: None,
497            on_failure: Some(OnFailure::Default),
498            tag: false,
499        };
500
501        // with no explicit default value
502        {
503            let val = VrlValue::Bytes(Bytes::from("hello"));
504            let result = coerce_value(&val, &transform).unwrap();
505            assert_eq!(result, Some(ValueData::I32Value(0)));
506        }
507
508        // with explicit default value
509        {
510            transform.default = Some(ValueData::I32Value(42));
511            let val = VrlValue::Bytes(Bytes::from("hello"));
512            let result = coerce_value(&val, &transform).unwrap();
513            assert_eq!(result, Some(ValueData::I32Value(42)));
514        }
515    }
516
517    #[test]
518    fn test_coerce_fulltext_options_with_custom_values() {
519        let transform = Transform {
520            fields: Fields::default(),
521            type_: ColumnDataType::String,
522            default: None,
523            index: Some(Index::Fulltext),
524            index_options: Some(TransformIndexOptions::Fulltext(
525                FulltextOptions::new_unchecked(
526                    true,
527                    FulltextAnalyzer::Chinese,
528                    true,
529                    FulltextBackend::Tantivy,
530                    10240,
531                    0.01,
532                ),
533            )),
534            on_failure: None,
535            tag: false,
536        };
537
538        let options = coerce_options(&transform).unwrap().unwrap();
539        let fulltext: FulltextOptions =
540            serde_json::from_str(options.options.get("fulltext").unwrap()).unwrap();
541
542        assert!(fulltext.enable);
543        assert_eq!(fulltext.analyzer.to_string(), "Chinese");
544        assert!(fulltext.case_sensitive);
545        assert_eq!(fulltext.backend.to_string(), "tantivy");
546    }
547
548    #[test]
549    fn test_coerce_skipping_options_with_custom_values() {
550        let transform = Transform {
551            fields: Fields::default(),
552            type_: ColumnDataType::Int64,
553            default: None,
554            index: Some(Index::Skipping),
555            index_options: Some(TransformIndexOptions::Skipping(
556                SkippingIndexOptions::new_unchecked(2048, 0.02, SkippingIndexType::BloomFilter),
557            )),
558            on_failure: None,
559            tag: false,
560        };
561
562        let options = coerce_options(&transform).unwrap().unwrap();
563        let skipping: SkippingIndexOptions =
564            serde_json::from_str(options.options.get("skipping_index").unwrap()).unwrap();
565
566        assert_eq!(skipping.granularity, 2048);
567        assert_eq!(skipping.false_positive_rate(), 0.02);
568        assert_eq!(skipping.index_type.to_string(), "BLOOM");
569    }
570
571    #[test]
572    fn test_coerce_rejects_mismatched_index_options() {
573        let transform = Transform {
574            fields: Fields::default(),
575            type_: ColumnDataType::String,
576            default: None,
577            index: Some(Index::Fulltext),
578            index_options: Some(TransformIndexOptions::Skipping(
579                SkippingIndexOptions::new_unchecked(2048, 0.02, SkippingIndexType::BloomFilter),
580            )),
581            on_failure: None,
582            tag: false,
583        };
584
585        assert!(coerce_options(&transform).is_err());
586    }
587
588    #[test]
589    fn test_coerce_rejects_index_options_without_index() {
590        let transform = Transform {
591            fields: Fields::default(),
592            type_: ColumnDataType::String,
593            default: None,
594            index: None,
595            index_options: Some(TransformIndexOptions::Fulltext(
596                FulltextOptions::new_unchecked(
597                    true,
598                    FulltextAnalyzer::Chinese,
599                    true,
600                    FulltextBackend::Tantivy,
601                    10240,
602                    0.01,
603                ),
604            )),
605            on_failure: None,
606            tag: false,
607        };
608
609        assert!(coerce_options(&transform).is_err());
610    }
611}