pipeline/etl/transform/transformer/greptime/
coerce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::ResultExt;
22
23use crate::error::{
24    CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
25    CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
26    CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
27};
28use crate::etl::transform::index::Index;
29use crate::etl::transform::{OnFailure, Transform};
30use crate::etl::value::{Timestamp, Value};
31
32impl TryFrom<Value> for ValueData {
33    type Error = Error;
34
35    fn try_from(value: Value) -> Result<Self> {
36        match value {
37            Value::Null => CoerceUnsupportedNullTypeSnafu.fail(),
38
39            Value::Int8(v) => Ok(ValueData::I32Value(v as i32)),
40            Value::Int16(v) => Ok(ValueData::I32Value(v as i32)),
41            Value::Int32(v) => Ok(ValueData::I32Value(v)),
42            Value::Int64(v) => Ok(ValueData::I64Value(v)),
43
44            Value::Uint8(v) => Ok(ValueData::U32Value(v as u32)),
45            Value::Uint16(v) => Ok(ValueData::U32Value(v as u32)),
46            Value::Uint32(v) => Ok(ValueData::U32Value(v)),
47            Value::Uint64(v) => Ok(ValueData::U64Value(v)),
48
49            Value::Float32(v) => Ok(ValueData::F32Value(v)),
50            Value::Float64(v) => Ok(ValueData::F64Value(v)),
51
52            Value::Boolean(v) => Ok(ValueData::BoolValue(v)),
53            Value::String(v) => Ok(ValueData::StringValue(v)),
54
55            Value::Timestamp(Timestamp::Nanosecond(ns)) => {
56                Ok(ValueData::TimestampNanosecondValue(ns))
57            }
58            Value::Timestamp(Timestamp::Microsecond(us)) => {
59                Ok(ValueData::TimestampMicrosecondValue(us))
60            }
61            Value::Timestamp(Timestamp::Millisecond(ms)) => {
62                Ok(ValueData::TimestampMillisecondValue(ms))
63            }
64            Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
65
66            Value::Array(_) | Value::Map(_) => {
67                let data: jsonb::Value = value.into();
68                Ok(ValueData::BinaryValue(data.to_vec()))
69            }
70        }
71    }
72}
73
74pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
75    let mut columns = Vec::new();
76
77    for field in transform.fields.iter() {
78        let column_name = field.target_or_input_field().to_string();
79
80        let (datatype, datatype_extension) = coerce_type(transform)?;
81
82        let semantic_type = coerce_semantic_type(transform) as i32;
83
84        let column = ColumnSchema {
85            column_name,
86            datatype: datatype as i32,
87            semantic_type,
88            datatype_extension,
89            options: coerce_options(transform)?,
90        };
91        columns.push(column);
92    }
93
94    Ok(columns)
95}
96
97fn coerce_semantic_type(transform: &Transform) -> SemanticType {
98    if transform.tag {
99        return SemanticType::Tag;
100    }
101
102    match transform.index {
103        Some(Index::Tag) => SemanticType::Tag,
104        Some(Index::Time) => SemanticType::Timestamp,
105        Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
106            SemanticType::Field
107        }
108    }
109}
110
111fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
112    match transform.index {
113        Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions {
114            enable: true,
115            ..Default::default()
116        })
117        .context(ColumnOptionsSnafu),
118        Some(Index::Skipping) => {
119            options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
120        }
121        Some(Index::Inverted) => Ok(Some(options_from_inverted())),
122        _ => Ok(None),
123    }
124}
125
126fn coerce_type(transform: &Transform) -> Result<(ColumnDataType, Option<ColumnDataTypeExtension>)> {
127    match transform.type_ {
128        Value::Int8(_) => Ok((ColumnDataType::Int8, None)),
129        Value::Int16(_) => Ok((ColumnDataType::Int16, None)),
130        Value::Int32(_) => Ok((ColumnDataType::Int32, None)),
131        Value::Int64(_) => Ok((ColumnDataType::Int64, None)),
132
133        Value::Uint8(_) => Ok((ColumnDataType::Uint8, None)),
134        Value::Uint16(_) => Ok((ColumnDataType::Uint16, None)),
135        Value::Uint32(_) => Ok((ColumnDataType::Uint32, None)),
136        Value::Uint64(_) => Ok((ColumnDataType::Uint64, None)),
137
138        Value::Float32(_) => Ok((ColumnDataType::Float32, None)),
139        Value::Float64(_) => Ok((ColumnDataType::Float64, None)),
140
141        Value::Boolean(_) => Ok((ColumnDataType::Boolean, None)),
142        Value::String(_) => Ok((ColumnDataType::String, None)),
143
144        Value::Timestamp(Timestamp::Nanosecond(_)) => {
145            Ok((ColumnDataType::TimestampNanosecond, None))
146        }
147        Value::Timestamp(Timestamp::Microsecond(_)) => {
148            Ok((ColumnDataType::TimestampMicrosecond, None))
149        }
150        Value::Timestamp(Timestamp::Millisecond(_)) => {
151            Ok((ColumnDataType::TimestampMillisecond, None))
152        }
153        Value::Timestamp(Timestamp::Second(_)) => Ok((ColumnDataType::TimestampSecond, None)),
154
155        Value::Array(_) | Value::Map(_) => Ok((
156            ColumnDataType::Binary,
157            Some(ColumnDataTypeExtension {
158                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
159            }),
160        )),
161
162        Value::Null => CoerceUnsupportedNullTypeToSnafu {
163            ty: transform.type_.to_str_type(),
164        }
165        .fail(),
166    }
167}
168
169pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<ValueData>> {
170    match val {
171        Value::Null => Ok(None),
172
173        Value::Int8(n) => coerce_i64_value(*n as i64, transform),
174        Value::Int16(n) => coerce_i64_value(*n as i64, transform),
175        Value::Int32(n) => coerce_i64_value(*n as i64, transform),
176        Value::Int64(n) => coerce_i64_value(*n, transform),
177
178        Value::Uint8(n) => coerce_u64_value(*n as u64, transform),
179        Value::Uint16(n) => coerce_u64_value(*n as u64, transform),
180        Value::Uint32(n) => coerce_u64_value(*n as u64, transform),
181        Value::Uint64(n) => coerce_u64_value(*n, transform),
182
183        Value::Float32(n) => coerce_f64_value(*n as f64, transform),
184        Value::Float64(n) => coerce_f64_value(*n, transform),
185
186        Value::Boolean(b) => coerce_bool_value(*b, transform),
187        Value::String(s) => coerce_string_value(s, transform),
188
189        Value::Timestamp(input_timestamp) => match &transform.type_ {
190            Value::Timestamp(target_timestamp) => match target_timestamp {
191                Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue(
192                    input_timestamp.timestamp_nanos(),
193                ))),
194                Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue(
195                    input_timestamp.timestamp_micros(),
196                ))),
197                Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue(
198                    input_timestamp.timestamp_millis(),
199                ))),
200                Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue(
201                    input_timestamp.timestamp(),
202                ))),
203            },
204            _ => CoerceIncompatibleTypesSnafu {
205                msg: "Timestamp can only be coerced to another type",
206            }
207            .fail(),
208        },
209
210        Value::Array(_) | Value::Map(_) => coerce_json_value(val, transform),
211    }
212}
213
214fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
215    let val = match transform.type_ {
216        Value::Int8(_) => ValueData::I8Value(b as i32),
217        Value::Int16(_) => ValueData::I16Value(b as i32),
218        Value::Int32(_) => ValueData::I32Value(b as i32),
219        Value::Int64(_) => ValueData::I64Value(b as i64),
220
221        Value::Uint8(_) => ValueData::U8Value(b as u32),
222        Value::Uint16(_) => ValueData::U16Value(b as u32),
223        Value::Uint32(_) => ValueData::U32Value(b as u32),
224        Value::Uint64(_) => ValueData::U64Value(b as u64),
225
226        Value::Float32(_) => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
227        Value::Float64(_) => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
228
229        Value::Boolean(_) => ValueData::BoolValue(b),
230        Value::String(_) => ValueData::StringValue(b.to_string()),
231
232        Value::Timestamp(_) => match transform.on_failure {
233            Some(OnFailure::Ignore) => return Ok(None),
234            Some(OnFailure::Default) => {
235                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
236            }
237            None => {
238                return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
239            }
240        },
241
242        Value::Array(_) | Value::Map(_) => {
243            return CoerceJsonTypeToSnafu {
244                ty: transform.type_.to_str_type(),
245            }
246            .fail()
247        }
248
249        Value::Null => return Ok(None),
250    };
251
252    Ok(Some(val))
253}
254
255fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
256    let val = match &transform.type_ {
257        Value::Int8(_) => ValueData::I8Value(n as i32),
258        Value::Int16(_) => ValueData::I16Value(n as i32),
259        Value::Int32(_) => ValueData::I32Value(n as i32),
260        Value::Int64(_) => ValueData::I64Value(n),
261
262        Value::Uint8(_) => ValueData::U8Value(n as u32),
263        Value::Uint16(_) => ValueData::U16Value(n as u32),
264        Value::Uint32(_) => ValueData::U32Value(n as u32),
265        Value::Uint64(_) => ValueData::U64Value(n as u64),
266
267        Value::Float32(_) => ValueData::F32Value(n as f32),
268        Value::Float64(_) => ValueData::F64Value(n as f64),
269
270        Value::Boolean(_) => ValueData::BoolValue(n != 0),
271        Value::String(_) => ValueData::StringValue(n.to_string()),
272
273        Value::Timestamp(unit) => match unit {
274            Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n),
275            Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n),
276            Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n),
277            Timestamp::Second(_) => ValueData::TimestampSecondValue(n),
278        },
279
280        Value::Array(_) | Value::Map(_) => {
281            return CoerceJsonTypeToSnafu {
282                ty: transform.type_.to_str_type(),
283            }
284            .fail()
285        }
286
287        Value::Null => return Ok(None),
288    };
289
290    Ok(Some(val))
291}
292
293fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
294    let val = match &transform.type_ {
295        Value::Int8(_) => ValueData::I8Value(n as i32),
296        Value::Int16(_) => ValueData::I16Value(n as i32),
297        Value::Int32(_) => ValueData::I32Value(n as i32),
298        Value::Int64(_) => ValueData::I64Value(n as i64),
299
300        Value::Uint8(_) => ValueData::U8Value(n as u32),
301        Value::Uint16(_) => ValueData::U16Value(n as u32),
302        Value::Uint32(_) => ValueData::U32Value(n as u32),
303        Value::Uint64(_) => ValueData::U64Value(n),
304
305        Value::Float32(_) => ValueData::F32Value(n as f32),
306        Value::Float64(_) => ValueData::F64Value(n as f64),
307
308        Value::Boolean(_) => ValueData::BoolValue(n != 0),
309        Value::String(_) => ValueData::StringValue(n.to_string()),
310
311        Value::Timestamp(unit) => match unit {
312            Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n as i64),
313            Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n as i64),
314            Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n as i64),
315            Timestamp::Second(_) => ValueData::TimestampSecondValue(n as i64),
316        },
317
318        Value::Array(_) | Value::Map(_) => {
319            return CoerceJsonTypeToSnafu {
320                ty: transform.type_.to_str_type(),
321            }
322            .fail()
323        }
324
325        Value::Null => return Ok(None),
326    };
327
328    Ok(Some(val))
329}
330
331fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
332    let val = match transform.type_ {
333        Value::Int8(_) => ValueData::I8Value(n as i32),
334        Value::Int16(_) => ValueData::I16Value(n as i32),
335        Value::Int32(_) => ValueData::I32Value(n as i32),
336        Value::Int64(_) => ValueData::I64Value(n as i64),
337
338        Value::Uint8(_) => ValueData::U8Value(n as u32),
339        Value::Uint16(_) => ValueData::U16Value(n as u32),
340        Value::Uint32(_) => ValueData::U32Value(n as u32),
341        Value::Uint64(_) => ValueData::U64Value(n as u64),
342
343        Value::Float32(_) => ValueData::F32Value(n as f32),
344        Value::Float64(_) => ValueData::F64Value(n),
345
346        Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
347        Value::String(_) => ValueData::StringValue(n.to_string()),
348
349        Value::Timestamp(_) => match transform.on_failure {
350            Some(OnFailure::Ignore) => return Ok(None),
351            Some(OnFailure::Default) => {
352                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
353            }
354            None => {
355                return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
356            }
357        },
358
359        Value::Array(_) | Value::Map(_) => {
360            return CoerceJsonTypeToSnafu {
361                ty: transform.type_.to_str_type(),
362            }
363            .fail()
364        }
365
366        Value::Null => return Ok(None),
367    };
368
369    Ok(Some(val))
370}
371
372macro_rules! coerce_string_value {
373    ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
374        match $s.parse::<$type>() {
375            Ok(v) => Ok(Some(ValueData::$parse(v))),
376            Err(_) => match $transform.on_failure {
377                Some(OnFailure::Ignore) => Ok(None),
378                Some(OnFailure::Default) => match $transform.get_default() {
379                    Some(default) => coerce_value(default, $transform),
380                    None => coerce_value($transform.get_type_matched_default_val(), $transform),
381                },
382                None => CoerceStringToTypeSnafu {
383                    s: $s,
384                    ty: $transform.type_.to_str_type(),
385                }
386                .fail(),
387            },
388        }
389    };
390}
391
392fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<ValueData>> {
393    match transform.type_ {
394        Value::Int8(_) => {
395            coerce_string_value!(s, transform, i32, I8Value)
396        }
397        Value::Int16(_) => {
398            coerce_string_value!(s, transform, i32, I16Value)
399        }
400        Value::Int32(_) => {
401            coerce_string_value!(s, transform, i32, I32Value)
402        }
403        Value::Int64(_) => {
404            coerce_string_value!(s, transform, i64, I64Value)
405        }
406
407        Value::Uint8(_) => {
408            coerce_string_value!(s, transform, u32, U8Value)
409        }
410        Value::Uint16(_) => {
411            coerce_string_value!(s, transform, u32, U16Value)
412        }
413        Value::Uint32(_) => {
414            coerce_string_value!(s, transform, u32, U32Value)
415        }
416        Value::Uint64(_) => {
417            coerce_string_value!(s, transform, u64, U64Value)
418        }
419
420        Value::Float32(_) => {
421            coerce_string_value!(s, transform, f32, F32Value)
422        }
423        Value::Float64(_) => {
424            coerce_string_value!(s, transform, f64, F64Value)
425        }
426
427        Value::Boolean(_) => {
428            coerce_string_value!(s, transform, bool, BoolValue)
429        }
430
431        Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))),
432
433        Value::Timestamp(_) => match transform.on_failure {
434            Some(OnFailure::Ignore) => Ok(None),
435            Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
436            None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
437        },
438
439        Value::Array(_) | Value::Map(_) => CoerceStringToTypeSnafu {
440            s,
441            ty: transform.type_.to_str_type(),
442        }
443        .fail(),
444
445        Value::Null => Ok(None),
446    }
447}
448
449fn coerce_json_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>> {
450    match &transform.type_ {
451        Value::Array(_) | Value::Map(_) => (),
452        t => {
453            return CoerceTypeToJsonSnafu {
454                ty: t.to_str_type(),
455            }
456            .fail();
457        }
458    }
459    match v {
460        Value::Map(_) => {
461            let data: jsonb::Value = v.into();
462            Ok(Some(ValueData::BinaryValue(data.to_vec())))
463        }
464        Value::Array(_) => {
465            let data: jsonb::Value = v.into();
466            Ok(Some(ValueData::BinaryValue(data.to_vec())))
467        }
468        _ => CoerceTypeToJsonSnafu {
469            ty: v.to_str_type(),
470        }
471        .fail(),
472    }
473}
474
475#[cfg(test)]
476mod tests {
477
478    use super::*;
479    use crate::etl::field::Fields;
480
481    #[test]
482    fn test_coerce_string_without_on_failure() {
483        let transform = Transform {
484            fields: Fields::default(),
485            type_: Value::Int32(0),
486            default: None,
487            index: None,
488            on_failure: None,
489            tag: false,
490        };
491
492        // valid string
493        {
494            let val = Value::String("123".to_string());
495            let result = coerce_value(&val, &transform).unwrap();
496            assert_eq!(result, Some(ValueData::I32Value(123)));
497        }
498
499        // invalid string
500        {
501            let val = Value::String("hello".to_string());
502            let result = coerce_value(&val, &transform);
503            assert!(result.is_err());
504        }
505    }
506
507    #[test]
508    fn test_coerce_string_with_on_failure_ignore() {
509        let transform = Transform {
510            fields: Fields::default(),
511            type_: Value::Int32(0),
512            default: None,
513            index: None,
514            on_failure: Some(OnFailure::Ignore),
515            tag: false,
516        };
517
518        let val = Value::String("hello".to_string());
519        let result = coerce_value(&val, &transform).unwrap();
520        assert_eq!(result, None);
521    }
522
523    #[test]
524    fn test_coerce_string_with_on_failure_default() {
525        let mut transform = Transform {
526            fields: Fields::default(),
527            type_: Value::Int32(0),
528            default: None,
529            index: None,
530            on_failure: Some(OnFailure::Default),
531            tag: false,
532        };
533
534        // with no explicit default value
535        {
536            let val = Value::String("hello".to_string());
537            let result = coerce_value(&val, &transform).unwrap();
538            assert_eq!(result, Some(ValueData::I32Value(0)));
539        }
540
541        // with explicit default value
542        {
543            transform.default = Some(Value::Int32(42));
544            let val = Value::String("hello".to_string());
545            let result = coerce_value(&val, &transform).unwrap();
546            assert_eq!(result, Some(ValueData::I32Value(42)));
547        }
548    }
549}