pipeline/etl/transform/transformer/greptime/
coerce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::{OptionExt, ResultExt};
22use vrl::value::Value as VrlValue;
23
24use crate::error::{
25    CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
26    CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, ColumnOptionsSnafu,
27    InvalidTimestampSnafu, Result, UnsupportedTypeInPipelineSnafu, VrlRegexValueSnafu,
28};
29use crate::etl::transform::index::Index;
30use crate::etl::transform::transformer::greptime::vrl_value_to_jsonb_value;
31use crate::etl::transform::{OnFailure, Transform};
32
33pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
34    let mut columns = Vec::new();
35
36    for field in transform.fields.iter() {
37        let column_name = field.target_or_input_field().to_string();
38
39        let ext = if matches!(transform.type_, ColumnDataType::Binary) {
40            Some(ColumnDataTypeExtension {
41                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
42            })
43        } else {
44            None
45        };
46
47        let semantic_type = coerce_semantic_type(transform) as i32;
48
49        let column = ColumnSchema {
50            column_name,
51            datatype: transform.type_ as i32,
52            semantic_type,
53            datatype_extension: ext,
54            options: coerce_options(transform)?,
55        };
56        columns.push(column);
57    }
58
59    Ok(columns)
60}
61
62fn coerce_semantic_type(transform: &Transform) -> SemanticType {
63    if transform.tag {
64        return SemanticType::Tag;
65    }
66
67    match transform.index {
68        Some(Index::Tag) => SemanticType::Tag,
69        Some(Index::Time) => SemanticType::Timestamp,
70        Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
71            SemanticType::Field
72        }
73    }
74}
75
76fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
77    match transform.index {
78        Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions {
79            enable: true,
80            ..Default::default()
81        })
82        .context(ColumnOptionsSnafu),
83        Some(Index::Skipping) => {
84            options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
85        }
86        Some(Index::Inverted) => Ok(Some(options_from_inverted())),
87        _ => Ok(None),
88    }
89}
90
91pub(crate) fn coerce_value(val: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
92    match val {
93        VrlValue::Null => Ok(None),
94        VrlValue::Integer(n) => coerce_i64_value(*n, transform),
95        VrlValue::Float(n) => coerce_f64_value(n.into_inner(), transform),
96        VrlValue::Boolean(b) => coerce_bool_value(*b, transform),
97        VrlValue::Bytes(b) => coerce_string_value(String::from_utf8_lossy(b).as_ref(), transform),
98        VrlValue::Timestamp(ts) => match transform.type_ {
99            ColumnDataType::TimestampNanosecond => Ok(Some(ValueData::TimestampNanosecondValue(
100                ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
101                    input: ts.to_rfc3339(),
102                })?,
103            ))),
104            ColumnDataType::TimestampMicrosecond => Ok(Some(ValueData::TimestampMicrosecondValue(
105                ts.timestamp_micros(),
106            ))),
107            ColumnDataType::TimestampMillisecond => Ok(Some(ValueData::TimestampMillisecondValue(
108                ts.timestamp_millis(),
109            ))),
110            ColumnDataType::TimestampSecond => {
111                Ok(Some(ValueData::TimestampSecondValue(ts.timestamp())))
112            }
113            _ => CoerceIncompatibleTypesSnafu {
114                msg: "Timestamp can only be coerced to another type",
115            }
116            .fail(),
117        },
118        VrlValue::Array(_) | VrlValue::Object(_) => coerce_json_value(val, transform),
119        VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
120    }
121}
122
123fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
124    let val = match transform.type_ {
125        ColumnDataType::Int8 => ValueData::I8Value(b as i32),
126        ColumnDataType::Int16 => ValueData::I16Value(b as i32),
127        ColumnDataType::Int32 => ValueData::I32Value(b as i32),
128        ColumnDataType::Int64 => ValueData::I64Value(b as i64),
129
130        ColumnDataType::Uint8 => ValueData::U8Value(b as u32),
131        ColumnDataType::Uint16 => ValueData::U16Value(b as u32),
132        ColumnDataType::Uint32 => ValueData::U32Value(b as u32),
133        ColumnDataType::Uint64 => ValueData::U64Value(b as u64),
134
135        ColumnDataType::Float32 => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
136        ColumnDataType::Float64 => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
137
138        ColumnDataType::Boolean => ValueData::BoolValue(b),
139        ColumnDataType::String => ValueData::StringValue(b.to_string()),
140
141        ColumnDataType::TimestampNanosecond
142        | ColumnDataType::TimestampMicrosecond
143        | ColumnDataType::TimestampMillisecond
144        | ColumnDataType::TimestampSecond => match transform.on_failure {
145            Some(OnFailure::Ignore) => return Ok(None),
146            Some(OnFailure::Default) => {
147                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
148            }
149            None => {
150                return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
151            }
152        },
153
154        ColumnDataType::Binary => {
155            return CoerceJsonTypeToSnafu {
156                ty: transform.type_.as_str_name(),
157            }
158            .fail()
159        }
160
161        _ => {
162            return UnsupportedTypeInPipelineSnafu {
163                ty: transform.type_.as_str_name(),
164            }
165            .fail()
166        }
167    };
168
169    Ok(Some(val))
170}
171
172fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
173    let val = match &transform.type_ {
174        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
175        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
176        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
177        ColumnDataType::Int64 => ValueData::I64Value(n),
178
179        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
180        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
181        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
182        ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
183
184        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
185        ColumnDataType::Float64 => ValueData::F64Value(n as f64),
186
187        ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
188        ColumnDataType::String => ValueData::StringValue(n.to_string()),
189
190        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n),
191        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n),
192        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n),
193        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n),
194
195        ColumnDataType::Binary => {
196            return CoerceJsonTypeToSnafu {
197                ty: transform.type_.as_str_name(),
198            }
199            .fail()
200        }
201
202        _ => return Ok(None),
203    };
204
205    Ok(Some(val))
206}
207
208fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
209    let val = match &transform.type_ {
210        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
211        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
212        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
213        ColumnDataType::Int64 => ValueData::I64Value(n as i64),
214
215        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
216        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
217        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
218        ColumnDataType::Uint64 => ValueData::U64Value(n),
219
220        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
221        ColumnDataType::Float64 => ValueData::F64Value(n as f64),
222
223        ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
224        ColumnDataType::String => ValueData::StringValue(n.to_string()),
225
226        ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n as i64),
227        ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n as i64),
228        ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n as i64),
229        ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n as i64),
230
231        ColumnDataType::Binary => {
232            return CoerceJsonTypeToSnafu {
233                ty: transform.type_.as_str_name(),
234            }
235            .fail()
236        }
237
238        _ => return Ok(None),
239    };
240
241    Ok(Some(val))
242}
243
244fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
245    let val = match transform.type_ {
246        ColumnDataType::Int8 => ValueData::I8Value(n as i32),
247        ColumnDataType::Int16 => ValueData::I16Value(n as i32),
248        ColumnDataType::Int32 => ValueData::I32Value(n as i32),
249        ColumnDataType::Int64 => ValueData::I64Value(n as i64),
250
251        ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
252        ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
253        ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
254        ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
255
256        ColumnDataType::Float32 => ValueData::F32Value(n as f32),
257        ColumnDataType::Float64 => ValueData::F64Value(n),
258
259        ColumnDataType::Boolean => ValueData::BoolValue(n != 0.0),
260        ColumnDataType::String => ValueData::StringValue(n.to_string()),
261
262        ColumnDataType::TimestampNanosecond
263        | ColumnDataType::TimestampMicrosecond
264        | ColumnDataType::TimestampMillisecond
265        | ColumnDataType::TimestampSecond => match transform.on_failure {
266            Some(OnFailure::Ignore) => return Ok(None),
267            Some(OnFailure::Default) => {
268                return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
269            }
270            None => {
271                return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
272            }
273        },
274
275        ColumnDataType::Binary => {
276            return CoerceJsonTypeToSnafu {
277                ty: transform.type_.as_str_name(),
278            }
279            .fail()
280        }
281
282        _ => return Ok(None),
283    };
284
285    Ok(Some(val))
286}
287
288macro_rules! coerce_string_value {
289    ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
290        match $s.parse::<$type>() {
291            Ok(v) => Ok(Some(ValueData::$parse(v))),
292            Err(_) => match $transform.on_failure {
293                Some(OnFailure::Ignore) => Ok(None),
294                Some(OnFailure::Default) => match $transform.get_default() {
295                    Some(default) => Ok(Some(default.clone())),
296                    None => $transform.get_type_matched_default_val().map(Some),
297                },
298                None => CoerceStringToTypeSnafu {
299                    s: $s,
300                    ty: $transform.type_.as_str_name(),
301                }
302                .fail(),
303            },
304        }
305    };
306}
307
308fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>> {
309    match transform.type_ {
310        ColumnDataType::Int8 => {
311            coerce_string_value!(s, transform, i32, I8Value)
312        }
313        ColumnDataType::Int16 => {
314            coerce_string_value!(s, transform, i32, I16Value)
315        }
316        ColumnDataType::Int32 => {
317            coerce_string_value!(s, transform, i32, I32Value)
318        }
319        ColumnDataType::Int64 => {
320            coerce_string_value!(s, transform, i64, I64Value)
321        }
322
323        ColumnDataType::Uint8 => {
324            coerce_string_value!(s, transform, u32, U8Value)
325        }
326        ColumnDataType::Uint16 => {
327            coerce_string_value!(s, transform, u32, U16Value)
328        }
329        ColumnDataType::Uint32 => {
330            coerce_string_value!(s, transform, u32, U32Value)
331        }
332        ColumnDataType::Uint64 => {
333            coerce_string_value!(s, transform, u64, U64Value)
334        }
335
336        ColumnDataType::Float32 => {
337            coerce_string_value!(s, transform, f32, F32Value)
338        }
339        ColumnDataType::Float64 => {
340            coerce_string_value!(s, transform, f64, F64Value)
341        }
342
343        ColumnDataType::Boolean => {
344            coerce_string_value!(s, transform, bool, BoolValue)
345        }
346
347        ColumnDataType::String => Ok(Some(ValueData::StringValue(s.to_string()))),
348
349        ColumnDataType::TimestampNanosecond
350        | ColumnDataType::TimestampMicrosecond
351        | ColumnDataType::TimestampMillisecond
352        | ColumnDataType::TimestampSecond => match transform.on_failure {
353            Some(OnFailure::Ignore) => Ok(None),
354            Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
355            None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
356        },
357
358        ColumnDataType::Binary => CoerceStringToTypeSnafu {
359            s,
360            ty: transform.type_.as_str_name(),
361        }
362        .fail(),
363
364        _ => Ok(None),
365    }
366}
367
368fn coerce_json_value(v: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
369    match &transform.type_ {
370        ColumnDataType::Binary => (),
371        t => {
372            return CoerceTypeToJsonSnafu {
373                ty: t.as_str_name(),
374            }
375            .fail();
376        }
377    }
378    let data: jsonb::Value = vrl_value_to_jsonb_value(v);
379    Ok(Some(ValueData::BinaryValue(data.to_vec())))
380}
381
382#[cfg(test)]
383mod tests {
384
385    use vrl::prelude::Bytes;
386
387    use super::*;
388    use crate::etl::field::Fields;
389
390    #[test]
391    fn test_coerce_string_without_on_failure() {
392        let transform = Transform {
393            fields: Fields::default(),
394            type_: ColumnDataType::Int32,
395            default: None,
396            index: None,
397            on_failure: None,
398            tag: false,
399        };
400
401        // valid string
402        {
403            let val = VrlValue::Integer(123);
404            let result = coerce_value(&val, &transform).unwrap();
405            assert_eq!(result, Some(ValueData::I32Value(123)));
406        }
407
408        // invalid string
409        {
410            let val = VrlValue::Bytes(Bytes::from("hello"));
411            let result = coerce_value(&val, &transform);
412            assert!(result.is_err());
413        }
414    }
415
416    #[test]
417    fn test_coerce_string_with_on_failure_ignore() {
418        let transform = Transform {
419            fields: Fields::default(),
420            type_: ColumnDataType::Int32,
421            default: None,
422            index: None,
423            on_failure: Some(OnFailure::Ignore),
424            tag: false,
425        };
426
427        let val = VrlValue::Bytes(Bytes::from("hello"));
428        let result = coerce_value(&val, &transform).unwrap();
429        assert_eq!(result, None);
430    }
431
432    #[test]
433    fn test_coerce_string_with_on_failure_default() {
434        let mut transform = Transform {
435            fields: Fields::default(),
436            type_: ColumnDataType::Int32,
437            default: None,
438            index: None,
439            on_failure: Some(OnFailure::Default),
440            tag: false,
441        };
442
443        // with no explicit default value
444        {
445            let val = VrlValue::Bytes(Bytes::from("hello"));
446            let result = coerce_value(&val, &transform).unwrap();
447            assert_eq!(result, Some(ValueData::I32Value(0)));
448        }
449
450        // with explicit default value
451        {
452            transform.default = Some(ValueData::I32Value(42));
453            let val = VrlValue::Bytes(Bytes::from("hello"));
454            let result = coerce_value(&val, &transform).unwrap();
455            assert_eq!(result, Some(ValueData::I32Value(42)));
456        }
457    }
458}