1use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::ResultExt;
22
23use crate::error::{
24 CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
25 CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
26 CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
27};
28use crate::etl::transform::index::Index;
29use crate::etl::transform::{OnFailure, Transform};
30use crate::etl::value::{Timestamp, Value};
31
32impl TryFrom<Value> for ValueData {
33 type Error = Error;
34
35 fn try_from(value: Value) -> Result<Self> {
36 match value {
37 Value::Null => CoerceUnsupportedNullTypeSnafu.fail(),
38
39 Value::Int8(v) => Ok(ValueData::I32Value(v as i32)),
40 Value::Int16(v) => Ok(ValueData::I32Value(v as i32)),
41 Value::Int32(v) => Ok(ValueData::I32Value(v)),
42 Value::Int64(v) => Ok(ValueData::I64Value(v)),
43
44 Value::Uint8(v) => Ok(ValueData::U32Value(v as u32)),
45 Value::Uint16(v) => Ok(ValueData::U32Value(v as u32)),
46 Value::Uint32(v) => Ok(ValueData::U32Value(v)),
47 Value::Uint64(v) => Ok(ValueData::U64Value(v)),
48
49 Value::Float32(v) => Ok(ValueData::F32Value(v)),
50 Value::Float64(v) => Ok(ValueData::F64Value(v)),
51
52 Value::Boolean(v) => Ok(ValueData::BoolValue(v)),
53 Value::String(v) => Ok(ValueData::StringValue(v)),
54
55 Value::Timestamp(Timestamp::Nanosecond(ns)) => {
56 Ok(ValueData::TimestampNanosecondValue(ns))
57 }
58 Value::Timestamp(Timestamp::Microsecond(us)) => {
59 Ok(ValueData::TimestampMicrosecondValue(us))
60 }
61 Value::Timestamp(Timestamp::Millisecond(ms)) => {
62 Ok(ValueData::TimestampMillisecondValue(ms))
63 }
64 Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)),
65
66 Value::Array(_) | Value::Map(_) => {
67 let data: jsonb::Value = value.into();
68 Ok(ValueData::BinaryValue(data.to_vec()))
69 }
70 }
71 }
72}
73
74pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
75 let mut columns = Vec::new();
76
77 for field in transform.fields.iter() {
78 let column_name = field.target_or_input_field().to_string();
79
80 let (datatype, datatype_extension) = coerce_type(transform)?;
81
82 let semantic_type = coerce_semantic_type(transform) as i32;
83
84 let column = ColumnSchema {
85 column_name,
86 datatype: datatype as i32,
87 semantic_type,
88 datatype_extension,
89 options: coerce_options(transform)?,
90 };
91 columns.push(column);
92 }
93
94 Ok(columns)
95}
96
97fn coerce_semantic_type(transform: &Transform) -> SemanticType {
98 if transform.tag {
99 return SemanticType::Tag;
100 }
101
102 match transform.index {
103 Some(Index::Tag) => SemanticType::Tag,
104 Some(Index::Time) => SemanticType::Timestamp,
105 Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
106 SemanticType::Field
107 }
108 }
109}
110
111fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
112 match transform.index {
113 Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions {
114 enable: true,
115 ..Default::default()
116 })
117 .context(ColumnOptionsSnafu),
118 Some(Index::Skipping) => {
119 options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
120 }
121 Some(Index::Inverted) => Ok(Some(options_from_inverted())),
122 _ => Ok(None),
123 }
124}
125
126fn coerce_type(transform: &Transform) -> Result<(ColumnDataType, Option<ColumnDataTypeExtension>)> {
127 match transform.type_ {
128 Value::Int8(_) => Ok((ColumnDataType::Int8, None)),
129 Value::Int16(_) => Ok((ColumnDataType::Int16, None)),
130 Value::Int32(_) => Ok((ColumnDataType::Int32, None)),
131 Value::Int64(_) => Ok((ColumnDataType::Int64, None)),
132
133 Value::Uint8(_) => Ok((ColumnDataType::Uint8, None)),
134 Value::Uint16(_) => Ok((ColumnDataType::Uint16, None)),
135 Value::Uint32(_) => Ok((ColumnDataType::Uint32, None)),
136 Value::Uint64(_) => Ok((ColumnDataType::Uint64, None)),
137
138 Value::Float32(_) => Ok((ColumnDataType::Float32, None)),
139 Value::Float64(_) => Ok((ColumnDataType::Float64, None)),
140
141 Value::Boolean(_) => Ok((ColumnDataType::Boolean, None)),
142 Value::String(_) => Ok((ColumnDataType::String, None)),
143
144 Value::Timestamp(Timestamp::Nanosecond(_)) => {
145 Ok((ColumnDataType::TimestampNanosecond, None))
146 }
147 Value::Timestamp(Timestamp::Microsecond(_)) => {
148 Ok((ColumnDataType::TimestampMicrosecond, None))
149 }
150 Value::Timestamp(Timestamp::Millisecond(_)) => {
151 Ok((ColumnDataType::TimestampMillisecond, None))
152 }
153 Value::Timestamp(Timestamp::Second(_)) => Ok((ColumnDataType::TimestampSecond, None)),
154
155 Value::Array(_) | Value::Map(_) => Ok((
156 ColumnDataType::Binary,
157 Some(ColumnDataTypeExtension {
158 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
159 }),
160 )),
161
162 Value::Null => CoerceUnsupportedNullTypeToSnafu {
163 ty: transform.type_.to_str_type(),
164 }
165 .fail(),
166 }
167}
168
169pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<ValueData>> {
170 match val {
171 Value::Null => Ok(None),
172
173 Value::Int8(n) => coerce_i64_value(*n as i64, transform),
174 Value::Int16(n) => coerce_i64_value(*n as i64, transform),
175 Value::Int32(n) => coerce_i64_value(*n as i64, transform),
176 Value::Int64(n) => coerce_i64_value(*n, transform),
177
178 Value::Uint8(n) => coerce_u64_value(*n as u64, transform),
179 Value::Uint16(n) => coerce_u64_value(*n as u64, transform),
180 Value::Uint32(n) => coerce_u64_value(*n as u64, transform),
181 Value::Uint64(n) => coerce_u64_value(*n, transform),
182
183 Value::Float32(n) => coerce_f64_value(*n as f64, transform),
184 Value::Float64(n) => coerce_f64_value(*n, transform),
185
186 Value::Boolean(b) => coerce_bool_value(*b, transform),
187 Value::String(s) => coerce_string_value(s, transform),
188
189 Value::Timestamp(input_timestamp) => match &transform.type_ {
190 Value::Timestamp(target_timestamp) => match target_timestamp {
191 Timestamp::Nanosecond(_) => Ok(Some(ValueData::TimestampNanosecondValue(
192 input_timestamp.timestamp_nanos(),
193 ))),
194 Timestamp::Microsecond(_) => Ok(Some(ValueData::TimestampMicrosecondValue(
195 input_timestamp.timestamp_micros(),
196 ))),
197 Timestamp::Millisecond(_) => Ok(Some(ValueData::TimestampMillisecondValue(
198 input_timestamp.timestamp_millis(),
199 ))),
200 Timestamp::Second(_) => Ok(Some(ValueData::TimestampSecondValue(
201 input_timestamp.timestamp(),
202 ))),
203 },
204 _ => CoerceIncompatibleTypesSnafu {
205 msg: "Timestamp can only be coerced to another type",
206 }
207 .fail(),
208 },
209
210 Value::Array(_) | Value::Map(_) => coerce_json_value(val, transform),
211 }
212}
213
214fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
215 let val = match transform.type_ {
216 Value::Int8(_) => ValueData::I8Value(b as i32),
217 Value::Int16(_) => ValueData::I16Value(b as i32),
218 Value::Int32(_) => ValueData::I32Value(b as i32),
219 Value::Int64(_) => ValueData::I64Value(b as i64),
220
221 Value::Uint8(_) => ValueData::U8Value(b as u32),
222 Value::Uint16(_) => ValueData::U16Value(b as u32),
223 Value::Uint32(_) => ValueData::U32Value(b as u32),
224 Value::Uint64(_) => ValueData::U64Value(b as u64),
225
226 Value::Float32(_) => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
227 Value::Float64(_) => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
228
229 Value::Boolean(_) => ValueData::BoolValue(b),
230 Value::String(_) => ValueData::StringValue(b.to_string()),
231
232 Value::Timestamp(_) => match transform.on_failure {
233 Some(OnFailure::Ignore) => return Ok(None),
234 Some(OnFailure::Default) => {
235 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
236 }
237 None => {
238 return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
239 }
240 },
241
242 Value::Array(_) | Value::Map(_) => {
243 return CoerceJsonTypeToSnafu {
244 ty: transform.type_.to_str_type(),
245 }
246 .fail()
247 }
248
249 Value::Null => return Ok(None),
250 };
251
252 Ok(Some(val))
253}
254
255fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
256 let val = match &transform.type_ {
257 Value::Int8(_) => ValueData::I8Value(n as i32),
258 Value::Int16(_) => ValueData::I16Value(n as i32),
259 Value::Int32(_) => ValueData::I32Value(n as i32),
260 Value::Int64(_) => ValueData::I64Value(n),
261
262 Value::Uint8(_) => ValueData::U8Value(n as u32),
263 Value::Uint16(_) => ValueData::U16Value(n as u32),
264 Value::Uint32(_) => ValueData::U32Value(n as u32),
265 Value::Uint64(_) => ValueData::U64Value(n as u64),
266
267 Value::Float32(_) => ValueData::F32Value(n as f32),
268 Value::Float64(_) => ValueData::F64Value(n as f64),
269
270 Value::Boolean(_) => ValueData::BoolValue(n != 0),
271 Value::String(_) => ValueData::StringValue(n.to_string()),
272
273 Value::Timestamp(unit) => match unit {
274 Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n),
275 Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n),
276 Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n),
277 Timestamp::Second(_) => ValueData::TimestampSecondValue(n),
278 },
279
280 Value::Array(_) | Value::Map(_) => {
281 return CoerceJsonTypeToSnafu {
282 ty: transform.type_.to_str_type(),
283 }
284 .fail()
285 }
286
287 Value::Null => return Ok(None),
288 };
289
290 Ok(Some(val))
291}
292
293fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
294 let val = match &transform.type_ {
295 Value::Int8(_) => ValueData::I8Value(n as i32),
296 Value::Int16(_) => ValueData::I16Value(n as i32),
297 Value::Int32(_) => ValueData::I32Value(n as i32),
298 Value::Int64(_) => ValueData::I64Value(n as i64),
299
300 Value::Uint8(_) => ValueData::U8Value(n as u32),
301 Value::Uint16(_) => ValueData::U16Value(n as u32),
302 Value::Uint32(_) => ValueData::U32Value(n as u32),
303 Value::Uint64(_) => ValueData::U64Value(n),
304
305 Value::Float32(_) => ValueData::F32Value(n as f32),
306 Value::Float64(_) => ValueData::F64Value(n as f64),
307
308 Value::Boolean(_) => ValueData::BoolValue(n != 0),
309 Value::String(_) => ValueData::StringValue(n.to_string()),
310
311 Value::Timestamp(unit) => match unit {
312 Timestamp::Nanosecond(_) => ValueData::TimestampNanosecondValue(n as i64),
313 Timestamp::Microsecond(_) => ValueData::TimestampMicrosecondValue(n as i64),
314 Timestamp::Millisecond(_) => ValueData::TimestampMillisecondValue(n as i64),
315 Timestamp::Second(_) => ValueData::TimestampSecondValue(n as i64),
316 },
317
318 Value::Array(_) | Value::Map(_) => {
319 return CoerceJsonTypeToSnafu {
320 ty: transform.type_.to_str_type(),
321 }
322 .fail()
323 }
324
325 Value::Null => return Ok(None),
326 };
327
328 Ok(Some(val))
329}
330
331fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
332 let val = match transform.type_ {
333 Value::Int8(_) => ValueData::I8Value(n as i32),
334 Value::Int16(_) => ValueData::I16Value(n as i32),
335 Value::Int32(_) => ValueData::I32Value(n as i32),
336 Value::Int64(_) => ValueData::I64Value(n as i64),
337
338 Value::Uint8(_) => ValueData::U8Value(n as u32),
339 Value::Uint16(_) => ValueData::U16Value(n as u32),
340 Value::Uint32(_) => ValueData::U32Value(n as u32),
341 Value::Uint64(_) => ValueData::U64Value(n as u64),
342
343 Value::Float32(_) => ValueData::F32Value(n as f32),
344 Value::Float64(_) => ValueData::F64Value(n),
345
346 Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
347 Value::String(_) => ValueData::StringValue(n.to_string()),
348
349 Value::Timestamp(_) => match transform.on_failure {
350 Some(OnFailure::Ignore) => return Ok(None),
351 Some(OnFailure::Default) => {
352 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
353 }
354 None => {
355 return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
356 }
357 },
358
359 Value::Array(_) | Value::Map(_) => {
360 return CoerceJsonTypeToSnafu {
361 ty: transform.type_.to_str_type(),
362 }
363 .fail()
364 }
365
366 Value::Null => return Ok(None),
367 };
368
369 Ok(Some(val))
370}
371
372macro_rules! coerce_string_value {
373 ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
374 match $s.parse::<$type>() {
375 Ok(v) => Ok(Some(ValueData::$parse(v))),
376 Err(_) => match $transform.on_failure {
377 Some(OnFailure::Ignore) => Ok(None),
378 Some(OnFailure::Default) => match $transform.get_default() {
379 Some(default) => coerce_value(default, $transform),
380 None => coerce_value($transform.get_type_matched_default_val(), $transform),
381 },
382 None => CoerceStringToTypeSnafu {
383 s: $s,
384 ty: $transform.type_.to_str_type(),
385 }
386 .fail(),
387 },
388 }
389 };
390}
391
392fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<ValueData>> {
393 match transform.type_ {
394 Value::Int8(_) => {
395 coerce_string_value!(s, transform, i32, I8Value)
396 }
397 Value::Int16(_) => {
398 coerce_string_value!(s, transform, i32, I16Value)
399 }
400 Value::Int32(_) => {
401 coerce_string_value!(s, transform, i32, I32Value)
402 }
403 Value::Int64(_) => {
404 coerce_string_value!(s, transform, i64, I64Value)
405 }
406
407 Value::Uint8(_) => {
408 coerce_string_value!(s, transform, u32, U8Value)
409 }
410 Value::Uint16(_) => {
411 coerce_string_value!(s, transform, u32, U16Value)
412 }
413 Value::Uint32(_) => {
414 coerce_string_value!(s, transform, u32, U32Value)
415 }
416 Value::Uint64(_) => {
417 coerce_string_value!(s, transform, u64, U64Value)
418 }
419
420 Value::Float32(_) => {
421 coerce_string_value!(s, transform, f32, F32Value)
422 }
423 Value::Float64(_) => {
424 coerce_string_value!(s, transform, f64, F64Value)
425 }
426
427 Value::Boolean(_) => {
428 coerce_string_value!(s, transform, bool, BoolValue)
429 }
430
431 Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))),
432
433 Value::Timestamp(_) => match transform.on_failure {
434 Some(OnFailure::Ignore) => Ok(None),
435 Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
436 None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
437 },
438
439 Value::Array(_) | Value::Map(_) => CoerceStringToTypeSnafu {
440 s,
441 ty: transform.type_.to_str_type(),
442 }
443 .fail(),
444
445 Value::Null => Ok(None),
446 }
447}
448
449fn coerce_json_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>> {
450 match &transform.type_ {
451 Value::Array(_) | Value::Map(_) => (),
452 t => {
453 return CoerceTypeToJsonSnafu {
454 ty: t.to_str_type(),
455 }
456 .fail();
457 }
458 }
459 match v {
460 Value::Map(_) => {
461 let data: jsonb::Value = v.into();
462 Ok(Some(ValueData::BinaryValue(data.to_vec())))
463 }
464 Value::Array(_) => {
465 let data: jsonb::Value = v.into();
466 Ok(Some(ValueData::BinaryValue(data.to_vec())))
467 }
468 _ => CoerceTypeToJsonSnafu {
469 ty: v.to_str_type(),
470 }
471 .fail(),
472 }
473}
474
475#[cfg(test)]
476mod tests {
477
478 use super::*;
479 use crate::etl::field::Fields;
480
481 #[test]
482 fn test_coerce_string_without_on_failure() {
483 let transform = Transform {
484 fields: Fields::default(),
485 type_: Value::Int32(0),
486 default: None,
487 index: None,
488 on_failure: None,
489 tag: false,
490 };
491
492 {
494 let val = Value::String("123".to_string());
495 let result = coerce_value(&val, &transform).unwrap();
496 assert_eq!(result, Some(ValueData::I32Value(123)));
497 }
498
499 {
501 let val = Value::String("hello".to_string());
502 let result = coerce_value(&val, &transform);
503 assert!(result.is_err());
504 }
505 }
506
507 #[test]
508 fn test_coerce_string_with_on_failure_ignore() {
509 let transform = Transform {
510 fields: Fields::default(),
511 type_: Value::Int32(0),
512 default: None,
513 index: None,
514 on_failure: Some(OnFailure::Ignore),
515 tag: false,
516 };
517
518 let val = Value::String("hello".to_string());
519 let result = coerce_value(&val, &transform).unwrap();
520 assert_eq!(result, None);
521 }
522
523 #[test]
524 fn test_coerce_string_with_on_failure_default() {
525 let mut transform = Transform {
526 fields: Fields::default(),
527 type_: Value::Int32(0),
528 default: None,
529 index: None,
530 on_failure: Some(OnFailure::Default),
531 tag: false,
532 };
533
534 {
536 let val = Value::String("hello".to_string());
537 let result = coerce_value(&val, &transform).unwrap();
538 assert_eq!(result, Some(ValueData::I32Value(0)));
539 }
540
541 {
543 transform.default = Some(Value::Int32(42));
544 let val = Value::String("hello".to_string());
545 let result = coerce_value(&val, &transform).unwrap();
546 assert_eq!(result, Some(ValueData::I32Value(42)));
547 }
548 }
549}