1use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::{OptionExt, ResultExt};
22use vrl::value::Value as VrlValue;
23
24use crate::error::{
25 CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
26 CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, ColumnOptionsSnafu,
27 InvalidTimestampSnafu, Result, UnsupportedTypeInPipelineSnafu, VrlRegexValueSnafu,
28};
29use crate::etl::transform::index::Index;
30use crate::etl::transform::transformer::greptime::vrl_value_to_jsonb_value;
31use crate::etl::transform::{OnFailure, Transform};
32
33pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
34 let mut columns = Vec::new();
35
36 for field in transform.fields.iter() {
37 let column_name = field.target_or_input_field().to_string();
38
39 let ext = if matches!(transform.type_, ColumnDataType::Binary) {
40 Some(ColumnDataTypeExtension {
41 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
42 })
43 } else {
44 None
45 };
46
47 let semantic_type = coerce_semantic_type(transform) as i32;
48
49 let column = ColumnSchema {
50 column_name,
51 datatype: transform.type_ as i32,
52 semantic_type,
53 datatype_extension: ext,
54 options: coerce_options(transform)?,
55 };
56 columns.push(column);
57 }
58
59 Ok(columns)
60}
61
62fn coerce_semantic_type(transform: &Transform) -> SemanticType {
63 if transform.tag {
64 return SemanticType::Tag;
65 }
66
67 match transform.index {
68 Some(Index::Tag) => SemanticType::Tag,
69 Some(Index::Time) => SemanticType::Timestamp,
70 Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
71 SemanticType::Field
72 }
73 }
74}
75
76fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
77 match transform.index {
78 Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions {
79 enable: true,
80 ..Default::default()
81 })
82 .context(ColumnOptionsSnafu),
83 Some(Index::Skipping) => {
84 options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
85 }
86 Some(Index::Inverted) => Ok(Some(options_from_inverted())),
87 _ => Ok(None),
88 }
89}
90
91pub(crate) fn coerce_value(val: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
92 match val {
93 VrlValue::Null => Ok(None),
94 VrlValue::Integer(n) => coerce_i64_value(*n, transform),
95 VrlValue::Float(n) => coerce_f64_value(n.into_inner(), transform),
96 VrlValue::Boolean(b) => coerce_bool_value(*b, transform),
97 VrlValue::Bytes(b) => coerce_string_value(String::from_utf8_lossy(b).as_ref(), transform),
98 VrlValue::Timestamp(ts) => match transform.type_ {
99 ColumnDataType::TimestampNanosecond => Ok(Some(ValueData::TimestampNanosecondValue(
100 ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
101 input: ts.to_rfc3339(),
102 })?,
103 ))),
104 ColumnDataType::TimestampMicrosecond => Ok(Some(ValueData::TimestampMicrosecondValue(
105 ts.timestamp_micros(),
106 ))),
107 ColumnDataType::TimestampMillisecond => Ok(Some(ValueData::TimestampMillisecondValue(
108 ts.timestamp_millis(),
109 ))),
110 ColumnDataType::TimestampSecond => {
111 Ok(Some(ValueData::TimestampSecondValue(ts.timestamp())))
112 }
113 _ => CoerceIncompatibleTypesSnafu {
114 msg: "Timestamp can only be coerced to another type",
115 }
116 .fail(),
117 },
118 VrlValue::Array(_) | VrlValue::Object(_) => coerce_json_value(val, transform),
119 VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
120 }
121}
122
123fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
124 let val = match transform.type_ {
125 ColumnDataType::Int8 => ValueData::I8Value(b as i32),
126 ColumnDataType::Int16 => ValueData::I16Value(b as i32),
127 ColumnDataType::Int32 => ValueData::I32Value(b as i32),
128 ColumnDataType::Int64 => ValueData::I64Value(b as i64),
129
130 ColumnDataType::Uint8 => ValueData::U8Value(b as u32),
131 ColumnDataType::Uint16 => ValueData::U16Value(b as u32),
132 ColumnDataType::Uint32 => ValueData::U32Value(b as u32),
133 ColumnDataType::Uint64 => ValueData::U64Value(b as u64),
134
135 ColumnDataType::Float32 => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
136 ColumnDataType::Float64 => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
137
138 ColumnDataType::Boolean => ValueData::BoolValue(b),
139 ColumnDataType::String => ValueData::StringValue(b.to_string()),
140
141 ColumnDataType::TimestampNanosecond
142 | ColumnDataType::TimestampMicrosecond
143 | ColumnDataType::TimestampMillisecond
144 | ColumnDataType::TimestampSecond => match transform.on_failure {
145 Some(OnFailure::Ignore) => return Ok(None),
146 Some(OnFailure::Default) => {
147 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
148 }
149 None => {
150 return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
151 }
152 },
153
154 ColumnDataType::Binary => {
155 return CoerceJsonTypeToSnafu {
156 ty: transform.type_.as_str_name(),
157 }
158 .fail()
159 }
160
161 _ => {
162 return UnsupportedTypeInPipelineSnafu {
163 ty: transform.type_.as_str_name(),
164 }
165 .fail()
166 }
167 };
168
169 Ok(Some(val))
170}
171
172fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
173 let val = match &transform.type_ {
174 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
175 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
176 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
177 ColumnDataType::Int64 => ValueData::I64Value(n),
178
179 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
180 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
181 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
182 ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
183
184 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
185 ColumnDataType::Float64 => ValueData::F64Value(n as f64),
186
187 ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
188 ColumnDataType::String => ValueData::StringValue(n.to_string()),
189
190 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n),
191 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n),
192 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n),
193 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n),
194
195 ColumnDataType::Binary => {
196 return CoerceJsonTypeToSnafu {
197 ty: transform.type_.as_str_name(),
198 }
199 .fail()
200 }
201
202 _ => return Ok(None),
203 };
204
205 Ok(Some(val))
206}
207
208fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
209 let val = match &transform.type_ {
210 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
211 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
212 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
213 ColumnDataType::Int64 => ValueData::I64Value(n as i64),
214
215 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
216 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
217 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
218 ColumnDataType::Uint64 => ValueData::U64Value(n),
219
220 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
221 ColumnDataType::Float64 => ValueData::F64Value(n as f64),
222
223 ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
224 ColumnDataType::String => ValueData::StringValue(n.to_string()),
225
226 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n as i64),
227 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n as i64),
228 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n as i64),
229 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n as i64),
230
231 ColumnDataType::Binary => {
232 return CoerceJsonTypeToSnafu {
233 ty: transform.type_.as_str_name(),
234 }
235 .fail()
236 }
237
238 _ => return Ok(None),
239 };
240
241 Ok(Some(val))
242}
243
244fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
245 let val = match transform.type_ {
246 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
247 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
248 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
249 ColumnDataType::Int64 => ValueData::I64Value(n as i64),
250
251 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
252 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
253 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
254 ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
255
256 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
257 ColumnDataType::Float64 => ValueData::F64Value(n),
258
259 ColumnDataType::Boolean => ValueData::BoolValue(n != 0.0),
260 ColumnDataType::String => ValueData::StringValue(n.to_string()),
261
262 ColumnDataType::TimestampNanosecond
263 | ColumnDataType::TimestampMicrosecond
264 | ColumnDataType::TimestampMillisecond
265 | ColumnDataType::TimestampSecond => match transform.on_failure {
266 Some(OnFailure::Ignore) => return Ok(None),
267 Some(OnFailure::Default) => {
268 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
269 }
270 None => {
271 return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
272 }
273 },
274
275 ColumnDataType::Binary => {
276 return CoerceJsonTypeToSnafu {
277 ty: transform.type_.as_str_name(),
278 }
279 .fail()
280 }
281
282 _ => return Ok(None),
283 };
284
285 Ok(Some(val))
286}
287
288macro_rules! coerce_string_value {
289 ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
290 match $s.parse::<$type>() {
291 Ok(v) => Ok(Some(ValueData::$parse(v))),
292 Err(_) => match $transform.on_failure {
293 Some(OnFailure::Ignore) => Ok(None),
294 Some(OnFailure::Default) => match $transform.get_default() {
295 Some(default) => Ok(Some(default.clone())),
296 None => $transform.get_type_matched_default_val().map(Some),
297 },
298 None => CoerceStringToTypeSnafu {
299 s: $s,
300 ty: $transform.type_.as_str_name(),
301 }
302 .fail(),
303 },
304 }
305 };
306}
307
308fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>> {
309 match transform.type_ {
310 ColumnDataType::Int8 => {
311 coerce_string_value!(s, transform, i32, I8Value)
312 }
313 ColumnDataType::Int16 => {
314 coerce_string_value!(s, transform, i32, I16Value)
315 }
316 ColumnDataType::Int32 => {
317 coerce_string_value!(s, transform, i32, I32Value)
318 }
319 ColumnDataType::Int64 => {
320 coerce_string_value!(s, transform, i64, I64Value)
321 }
322
323 ColumnDataType::Uint8 => {
324 coerce_string_value!(s, transform, u32, U8Value)
325 }
326 ColumnDataType::Uint16 => {
327 coerce_string_value!(s, transform, u32, U16Value)
328 }
329 ColumnDataType::Uint32 => {
330 coerce_string_value!(s, transform, u32, U32Value)
331 }
332 ColumnDataType::Uint64 => {
333 coerce_string_value!(s, transform, u64, U64Value)
334 }
335
336 ColumnDataType::Float32 => {
337 coerce_string_value!(s, transform, f32, F32Value)
338 }
339 ColumnDataType::Float64 => {
340 coerce_string_value!(s, transform, f64, F64Value)
341 }
342
343 ColumnDataType::Boolean => {
344 coerce_string_value!(s, transform, bool, BoolValue)
345 }
346
347 ColumnDataType::String => Ok(Some(ValueData::StringValue(s.to_string()))),
348
349 ColumnDataType::TimestampNanosecond
350 | ColumnDataType::TimestampMicrosecond
351 | ColumnDataType::TimestampMillisecond
352 | ColumnDataType::TimestampSecond => match transform.on_failure {
353 Some(OnFailure::Ignore) => Ok(None),
354 Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
355 None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
356 },
357
358 ColumnDataType::Binary => CoerceStringToTypeSnafu {
359 s,
360 ty: transform.type_.as_str_name(),
361 }
362 .fail(),
363
364 _ => Ok(None),
365 }
366}
367
368fn coerce_json_value(v: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
369 match &transform.type_ {
370 ColumnDataType::Binary => (),
371 t => {
372 return CoerceTypeToJsonSnafu {
373 ty: t.as_str_name(),
374 }
375 .fail();
376 }
377 }
378 let data: jsonb::Value = vrl_value_to_jsonb_value(v);
379 Ok(Some(ValueData::BinaryValue(data.to_vec())))
380}
381
382#[cfg(test)]
383mod tests {
384
385 use vrl::prelude::Bytes;
386
387 use super::*;
388 use crate::etl::field::Fields;
389
390 #[test]
391 fn test_coerce_string_without_on_failure() {
392 let transform = Transform {
393 fields: Fields::default(),
394 type_: ColumnDataType::Int32,
395 default: None,
396 index: None,
397 on_failure: None,
398 tag: false,
399 };
400
401 {
403 let val = VrlValue::Integer(123);
404 let result = coerce_value(&val, &transform).unwrap();
405 assert_eq!(result, Some(ValueData::I32Value(123)));
406 }
407
408 {
410 let val = VrlValue::Bytes(Bytes::from("hello"));
411 let result = coerce_value(&val, &transform);
412 assert!(result.is_err());
413 }
414 }
415
416 #[test]
417 fn test_coerce_string_with_on_failure_ignore() {
418 let transform = Transform {
419 fields: Fields::default(),
420 type_: ColumnDataType::Int32,
421 default: None,
422 index: None,
423 on_failure: Some(OnFailure::Ignore),
424 tag: false,
425 };
426
427 let val = VrlValue::Bytes(Bytes::from("hello"));
428 let result = coerce_value(&val, &transform).unwrap();
429 assert_eq!(result, None);
430 }
431
432 #[test]
433 fn test_coerce_string_with_on_failure_default() {
434 let mut transform = Transform {
435 fields: Fields::default(),
436 type_: ColumnDataType::Int32,
437 default: None,
438 index: None,
439 on_failure: Some(OnFailure::Default),
440 tag: false,
441 };
442
443 {
445 let val = VrlValue::Bytes(Bytes::from("hello"));
446 let result = coerce_value(&val, &transform).unwrap();
447 assert_eq!(result, Some(ValueData::I32Value(0)));
448 }
449
450 {
452 transform.default = Some(ValueData::I32Value(42));
453 let val = VrlValue::Bytes(Bytes::from("hello"));
454 let result = coerce_value(&val, &transform).unwrap();
455 assert_eq!(result, Some(ValueData::I32Value(42)));
456 }
457 }
458}