1use std::collections::BTreeMap;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use ordered_float::NotNan;
20use snafu::{OptionExt, ResultExt};
21use vrl::prelude::Bytes;
22use vrl::value::{KeyString, Value as VrlValue};
23
24use crate::error::{
25 FloatIsNanSnafu, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu,
26 ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu,
27 ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
28};
29
30pub(crate) const NANOSECOND_RESOLUTION: &str = "nanosecond";
31pub(crate) const NANO_RESOLUTION: &str = "nano";
32pub(crate) const NS_RESOLUTION: &str = "ns";
33pub(crate) const MICROSECOND_RESOLUTION: &str = "microsecond";
34pub(crate) const MICRO_RESOLUTION: &str = "micro";
35pub(crate) const US_RESOLUTION: &str = "us";
36pub(crate) const MILLISECOND_RESOLUTION: &str = "millisecond";
37pub(crate) const MILLI_RESOLUTION: &str = "milli";
38pub(crate) const MS_RESOLUTION: &str = "ms";
39pub(crate) const SECOND_RESOLUTION: &str = "second";
40pub(crate) const SEC_RESOLUTION: &str = "sec";
41pub(crate) const S_RESOLUTION: &str = "s";
42
43pub(crate) const VALID_RESOLUTIONS: [&str; 12] = [
44 NANOSECOND_RESOLUTION,
45 NANO_RESOLUTION,
46 NS_RESOLUTION,
47 MICROSECOND_RESOLUTION,
48 MICRO_RESOLUTION,
49 US_RESOLUTION,
50 MILLISECOND_RESOLUTION,
51 MILLI_RESOLUTION,
52 MS_RESOLUTION,
53 SECOND_RESOLUTION,
54 SEC_RESOLUTION,
55 S_RESOLUTION,
56];
57
58pub fn parse_str_type(t: &str) -> Result<ColumnDataType> {
59 let mut parts = t.splitn(2, ',');
60 let head = parts.next().unwrap_or_default();
61 let tail = parts.next().map(|s| s.trim().to_string());
62 match head.to_lowercase().as_str() {
63 "int8" => Ok(ColumnDataType::Int8),
64 "int16" => Ok(ColumnDataType::Int16),
65 "int32" => Ok(ColumnDataType::Int32),
66 "int64" => Ok(ColumnDataType::Int64),
67
68 "uint8" => Ok(ColumnDataType::Uint8),
69 "uint16" => Ok(ColumnDataType::Uint16),
70 "uint32" => Ok(ColumnDataType::Uint32),
71 "uint64" => Ok(ColumnDataType::Uint64),
72
73 "float32" => Ok(ColumnDataType::Float32),
74 "float64" => Ok(ColumnDataType::Float64),
75
76 "boolean" => Ok(ColumnDataType::Boolean),
77 "string" => Ok(ColumnDataType::String),
78
79 "timestamp" | "epoch" | "time" => match tail {
80 Some(resolution) if !resolution.is_empty() => match resolution.as_str() {
81 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => {
82 Ok(ColumnDataType::TimestampNanosecond)
83 }
84 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => {
85 Ok(ColumnDataType::TimestampMicrosecond)
86 }
87 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => {
88 Ok(ColumnDataType::TimestampMillisecond)
89 }
90 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => {
91 Ok(ColumnDataType::TimestampSecond)
92 }
93 _ => ValueInvalidResolutionSnafu {
94 resolution,
95 valid_resolution: VALID_RESOLUTIONS.join(","),
96 }
97 .fail(),
98 },
99 _ => Ok(ColumnDataType::TimestampNanosecond),
100 },
101
102 "json" => Ok(ColumnDataType::Binary),
105
106 _ => ValueParseTypeSnafu { t }.fail(),
107 }
108}
109
110pub fn parse_str_value(type_: &ColumnDataType, v: &str) -> Result<ValueData> {
111 match type_ {
112 ColumnDataType::Int8 => v
113 .parse::<i8>()
114 .map(|v| ValueData::I8Value(v as i32))
115 .context(ValueParseIntSnafu { ty: "int8", v }),
116 ColumnDataType::Int16 => v
117 .parse::<i16>()
118 .map(|v| ValueData::I16Value(v as i32))
119 .context(ValueParseIntSnafu { ty: "int16", v }),
120 ColumnDataType::Int32 => v
121 .parse::<i32>()
122 .map(ValueData::I32Value)
123 .context(ValueParseIntSnafu { ty: "int32", v }),
124 ColumnDataType::Int64 => v
125 .parse::<i64>()
126 .map(ValueData::I64Value)
127 .context(ValueParseIntSnafu { ty: "int64", v }),
128
129 ColumnDataType::Uint8 => v
130 .parse::<u8>()
131 .map(|v| ValueData::U8Value(v as u32))
132 .context(ValueParseIntSnafu { ty: "uint8", v }),
133 ColumnDataType::Uint16 => v
134 .parse::<u16>()
135 .map(|v| ValueData::U16Value(v as u32))
136 .context(ValueParseIntSnafu { ty: "uint16", v }),
137 ColumnDataType::Uint32 => v
138 .parse::<u32>()
139 .map(ValueData::U32Value)
140 .context(ValueParseIntSnafu { ty: "uint32", v }),
141 ColumnDataType::Uint64 => v
142 .parse::<u64>()
143 .map(ValueData::U64Value)
144 .context(ValueParseIntSnafu { ty: "uint64", v }),
145
146 ColumnDataType::Float32 => v
147 .parse::<f32>()
148 .map(ValueData::F32Value)
149 .context(ValueParseFloatSnafu { ty: "float32", v }),
150 ColumnDataType::Float64 => v
151 .parse::<f64>()
152 .map(ValueData::F64Value)
153 .context(ValueParseFloatSnafu { ty: "float64", v }),
154
155 ColumnDataType::Boolean => v
156 .parse::<bool>()
157 .map(ValueData::BoolValue)
158 .context(ValueParseBooleanSnafu { ty: "boolean", v }),
159 ColumnDataType::String => Ok(ValueData::StringValue(v.to_string())),
160
161 _ => ValueDefaultValueUnsupportedSnafu {
162 value: format!("{:?}", type_),
163 }
164 .fail(),
165 }
166}
167
168pub fn yaml_to_vrl_value(v: &yaml_rust::Yaml) -> Result<VrlValue> {
169 match v {
170 yaml_rust::Yaml::Null => Ok(VrlValue::Null),
171 yaml_rust::Yaml::Boolean(v) => Ok(VrlValue::Boolean(*v)),
172 yaml_rust::Yaml::Integer(v) => Ok(VrlValue::Integer(*v)),
173 yaml_rust::Yaml::Real(v) => {
174 let f = v
175 .parse::<f64>()
176 .context(ValueParseFloatSnafu { ty: "float64", v })?;
177 NotNan::new(f).map(VrlValue::Float).context(FloatIsNanSnafu)
178 }
179 yaml_rust::Yaml::String(v) => Ok(VrlValue::Bytes(Bytes::from(v.to_string()))),
180 yaml_rust::Yaml::Array(arr) => {
181 let mut values = vec![];
182 for v in arr {
183 values.push(yaml_to_vrl_value(v)?);
184 }
185 Ok(VrlValue::Array(values))
186 }
187 yaml_rust::Yaml::Hash(v) => {
188 let mut values = BTreeMap::new();
189 for (k, v) in v {
190 let key = k
191 .as_str()
192 .with_context(|| ValueYamlKeyMustBeStringSnafu { value: v.clone() })?;
193 values.insert(KeyString::from(key), yaml_to_vrl_value(v)?);
194 }
195 Ok(VrlValue::Object(values))
196 }
197 _ => ValueUnsupportedYamlTypeSnafu { value: v.clone() }.fail(),
198 }
199}