pipeline/etl/
value.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use ordered_float::NotNan;
20use snafu::{OptionExt, ResultExt};
21use vrl::prelude::Bytes;
22use vrl::value::{KeyString, Value as VrlValue};
23
24use crate::error::{
25    FloatIsNanSnafu, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu,
26    ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu,
27    ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
28};
29
30pub(crate) const NANOSECOND_RESOLUTION: &str = "nanosecond";
31pub(crate) const NANO_RESOLUTION: &str = "nano";
32pub(crate) const NS_RESOLUTION: &str = "ns";
33pub(crate) const MICROSECOND_RESOLUTION: &str = "microsecond";
34pub(crate) const MICRO_RESOLUTION: &str = "micro";
35pub(crate) const US_RESOLUTION: &str = "us";
36pub(crate) const MILLISECOND_RESOLUTION: &str = "millisecond";
37pub(crate) const MILLI_RESOLUTION: &str = "milli";
38pub(crate) const MS_RESOLUTION: &str = "ms";
39pub(crate) const SECOND_RESOLUTION: &str = "second";
40pub(crate) const SEC_RESOLUTION: &str = "sec";
41pub(crate) const S_RESOLUTION: &str = "s";
42
43pub(crate) const VALID_RESOLUTIONS: [&str; 12] = [
44    NANOSECOND_RESOLUTION,
45    NANO_RESOLUTION,
46    NS_RESOLUTION,
47    MICROSECOND_RESOLUTION,
48    MICRO_RESOLUTION,
49    US_RESOLUTION,
50    MILLISECOND_RESOLUTION,
51    MILLI_RESOLUTION,
52    MS_RESOLUTION,
53    SECOND_RESOLUTION,
54    SEC_RESOLUTION,
55    S_RESOLUTION,
56];
57
58pub fn parse_str_type(t: &str) -> Result<ColumnDataType> {
59    let mut parts = t.splitn(2, ',');
60    let head = parts.next().unwrap_or_default();
61    let tail = parts.next().map(|s| s.trim().to_string());
62    match head.to_lowercase().as_str() {
63        "int8" => Ok(ColumnDataType::Int8),
64        "int16" => Ok(ColumnDataType::Int16),
65        "int32" => Ok(ColumnDataType::Int32),
66        "int64" => Ok(ColumnDataType::Int64),
67
68        "uint8" => Ok(ColumnDataType::Uint8),
69        "uint16" => Ok(ColumnDataType::Uint16),
70        "uint32" => Ok(ColumnDataType::Uint32),
71        "uint64" => Ok(ColumnDataType::Uint64),
72
73        "float32" => Ok(ColumnDataType::Float32),
74        "float64" => Ok(ColumnDataType::Float64),
75
76        "boolean" => Ok(ColumnDataType::Boolean),
77        "string" => Ok(ColumnDataType::String),
78
79        "timestamp" | "epoch" | "time" => match tail {
80            Some(resolution) if !resolution.is_empty() => match resolution.as_str() {
81                NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => {
82                    Ok(ColumnDataType::TimestampNanosecond)
83                }
84                MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => {
85                    Ok(ColumnDataType::TimestampMicrosecond)
86                }
87                MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => {
88                    Ok(ColumnDataType::TimestampMillisecond)
89                }
90                SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => {
91                    Ok(ColumnDataType::TimestampSecond)
92                }
93                _ => ValueInvalidResolutionSnafu {
94                    resolution,
95                    valid_resolution: VALID_RESOLUTIONS.join(","),
96                }
97                .fail(),
98            },
99            _ => Ok(ColumnDataType::TimestampNanosecond),
100        },
101
102        // We only consider object and array to be json types. and use Map to represent json
103        // TODO(qtang): Needs to be defined with better semantics
104        "json" => Ok(ColumnDataType::Binary),
105
106        _ => ValueParseTypeSnafu { t }.fail(),
107    }
108}
109
110pub fn parse_str_value(type_: &ColumnDataType, v: &str) -> Result<ValueData> {
111    match type_ {
112        ColumnDataType::Int8 => v
113            .parse::<i8>()
114            .map(|v| ValueData::I8Value(v as i32))
115            .context(ValueParseIntSnafu { ty: "int8", v }),
116        ColumnDataType::Int16 => v
117            .parse::<i16>()
118            .map(|v| ValueData::I16Value(v as i32))
119            .context(ValueParseIntSnafu { ty: "int16", v }),
120        ColumnDataType::Int32 => v
121            .parse::<i32>()
122            .map(ValueData::I32Value)
123            .context(ValueParseIntSnafu { ty: "int32", v }),
124        ColumnDataType::Int64 => v
125            .parse::<i64>()
126            .map(ValueData::I64Value)
127            .context(ValueParseIntSnafu { ty: "int64", v }),
128
129        ColumnDataType::Uint8 => v
130            .parse::<u8>()
131            .map(|v| ValueData::U8Value(v as u32))
132            .context(ValueParseIntSnafu { ty: "uint8", v }),
133        ColumnDataType::Uint16 => v
134            .parse::<u16>()
135            .map(|v| ValueData::U16Value(v as u32))
136            .context(ValueParseIntSnafu { ty: "uint16", v }),
137        ColumnDataType::Uint32 => v
138            .parse::<u32>()
139            .map(ValueData::U32Value)
140            .context(ValueParseIntSnafu { ty: "uint32", v }),
141        ColumnDataType::Uint64 => v
142            .parse::<u64>()
143            .map(ValueData::U64Value)
144            .context(ValueParseIntSnafu { ty: "uint64", v }),
145
146        ColumnDataType::Float32 => v
147            .parse::<f32>()
148            .map(ValueData::F32Value)
149            .context(ValueParseFloatSnafu { ty: "float32", v }),
150        ColumnDataType::Float64 => v
151            .parse::<f64>()
152            .map(ValueData::F64Value)
153            .context(ValueParseFloatSnafu { ty: "float64", v }),
154
155        ColumnDataType::Boolean => v
156            .parse::<bool>()
157            .map(ValueData::BoolValue)
158            .context(ValueParseBooleanSnafu { ty: "boolean", v }),
159        ColumnDataType::String => Ok(ValueData::StringValue(v.to_string())),
160
161        _ => ValueDefaultValueUnsupportedSnafu {
162            value: format!("{:?}", type_),
163        }
164        .fail(),
165    }
166}
167
168pub fn yaml_to_vrl_value(v: &yaml_rust::Yaml) -> Result<VrlValue> {
169    match v {
170        yaml_rust::Yaml::Null => Ok(VrlValue::Null),
171        yaml_rust::Yaml::Boolean(v) => Ok(VrlValue::Boolean(*v)),
172        yaml_rust::Yaml::Integer(v) => Ok(VrlValue::Integer(*v)),
173        yaml_rust::Yaml::Real(v) => {
174            let f = v
175                .parse::<f64>()
176                .context(ValueParseFloatSnafu { ty: "float64", v })?;
177            NotNan::new(f).map(VrlValue::Float).context(FloatIsNanSnafu)
178        }
179        yaml_rust::Yaml::String(v) => Ok(VrlValue::Bytes(Bytes::from(v.to_string()))),
180        yaml_rust::Yaml::Array(arr) => {
181            let mut values = vec![];
182            for v in arr {
183                values.push(yaml_to_vrl_value(v)?);
184            }
185            Ok(VrlValue::Array(values))
186        }
187        yaml_rust::Yaml::Hash(v) => {
188            let mut values = BTreeMap::new();
189            for (k, v) in v {
190                let key = k
191                    .as_str()
192                    .with_context(|| ValueYamlKeyMustBeStringSnafu { value: v.clone() })?;
193                values.insert(KeyString::from(key), yaml_to_vrl_value(v)?);
194            }
195            Ok(VrlValue::Object(values))
196        }
197        _ => ValueUnsupportedYamlTypeSnafu { value: v.clone() }.fail(),
198    }
199}