1use api::v1::column_data_type_extension::TypeExt;
16use api::v1::column_def::{options_from_fulltext, options_from_inverted, options_from_skipping};
17use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
18use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
19use greptime_proto::v1::value::ValueData;
20use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
21use snafu::{OptionExt, ResultExt, ensure};
22use vrl::value::Value as VrlValue;
23
24use crate::error::{
25 CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
26 CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, ColumnOptionsSnafu,
27 InvalidTimestampSnafu, Result, TransformIndexStateMismatchSnafu,
28 UnsupportedTypeInPipelineSnafu, VrlRegexValueSnafu,
29};
30use crate::etl::transform::index::Index;
31use crate::etl::transform::transformer::greptime::vrl_value_to_jsonb_value;
32use crate::etl::transform::{OnFailure, Transform, TransformIndexOptions};
33
34pub(crate) fn coerce_columns(transform: &Transform) -> Result<Vec<ColumnSchema>> {
35 let mut columns = Vec::new();
36
37 for field in transform.fields.iter() {
38 let column_name = field.target_or_input_field().to_string();
39
40 let ext = if matches!(transform.type_, ColumnDataType::Binary) {
41 Some(ColumnDataTypeExtension {
42 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
43 })
44 } else {
45 None
46 };
47
48 let semantic_type = coerce_semantic_type(transform) as i32;
49
50 let column = ColumnSchema {
51 column_name,
52 datatype: transform.type_ as i32,
53 semantic_type,
54 datatype_extension: ext,
55 options: coerce_options(transform)?,
56 };
57 columns.push(column);
58 }
59
60 Ok(columns)
61}
62
63fn coerce_semantic_type(transform: &Transform) -> SemanticType {
64 if transform.tag {
65 return SemanticType::Tag;
66 }
67
68 match transform.index {
69 Some(Index::Tag) => SemanticType::Tag,
70 Some(Index::Time) => SemanticType::Timestamp,
71 Some(Index::Fulltext) | Some(Index::Skipping) | Some(Index::Inverted) | None => {
72 SemanticType::Field
73 }
74 }
75}
76
77fn transform_index_label(index: Option<Index>) -> String {
78 index
79 .map(|index| index.to_string())
80 .unwrap_or_else(|| "none".to_string())
81}
82
83fn validate_transform_index_state(transform: &Transform) -> Result<()> {
84 let Some(index_options) = transform.index_options.as_ref() else {
85 return Ok(());
86 };
87
88 let options_index = index_options.index();
89 let index = transform.index;
90 ensure!(
91 index == Some(options_index),
92 TransformIndexStateMismatchSnafu {
93 index: transform_index_label(index),
94 options: options_index.to_string(),
95 }
96 );
97
98 Ok(())
99}
100
101fn build_fulltext_index_options(transform: &Transform) -> Result<FulltextOptions> {
102 match transform.index_options.as_ref() {
103 None => Ok(FulltextOptions {
104 enable: true,
105 ..Default::default()
106 }),
107 Some(TransformIndexOptions::Fulltext(options)) => Ok(options.clone()),
108 Some(options) => TransformIndexStateMismatchSnafu {
109 index: Index::Fulltext.to_string(),
110 options: options.index().to_string(),
111 }
112 .fail(),
113 }
114}
115
116fn build_skipping_index_options(transform: &Transform) -> Result<SkippingIndexOptions> {
117 match transform.index_options.as_ref() {
118 None => Ok(SkippingIndexOptions::default()),
119 Some(TransformIndexOptions::Skipping(options)) => Ok(options.clone()),
120 Some(options) => TransformIndexStateMismatchSnafu {
121 index: Index::Skipping.to_string(),
122 options: options.index().to_string(),
123 }
124 .fail(),
125 }
126}
127
128fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
129 validate_transform_index_state(transform)?;
130
131 match transform.index {
132 Some(Index::Fulltext) => {
133 let options = build_fulltext_index_options(transform)?;
134 options_from_fulltext(&options).context(ColumnOptionsSnafu)
135 }
136 Some(Index::Skipping) => {
137 let options = build_skipping_index_options(transform)?;
138 options_from_skipping(&options).context(ColumnOptionsSnafu)
139 }
140 Some(Index::Inverted) => Ok(Some(options_from_inverted())),
141 _ => Ok(None),
142 }
143}
144
145pub(crate) fn coerce_value(val: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
146 match val {
147 VrlValue::Null => Ok(None),
148 VrlValue::Integer(n) => coerce_i64_value(*n, transform),
149 VrlValue::Float(n) => coerce_f64_value(n.into_inner(), transform),
150 VrlValue::Boolean(b) => coerce_bool_value(*b, transform),
151 VrlValue::Bytes(b) => coerce_string_value(String::from_utf8_lossy(b).as_ref(), transform),
152 VrlValue::Timestamp(ts) => match transform.type_ {
153 ColumnDataType::TimestampNanosecond => Ok(Some(ValueData::TimestampNanosecondValue(
154 ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
155 input: ts.to_rfc3339(),
156 })?,
157 ))),
158 ColumnDataType::TimestampMicrosecond => Ok(Some(ValueData::TimestampMicrosecondValue(
159 ts.timestamp_micros(),
160 ))),
161 ColumnDataType::TimestampMillisecond => Ok(Some(ValueData::TimestampMillisecondValue(
162 ts.timestamp_millis(),
163 ))),
164 ColumnDataType::TimestampSecond => {
165 Ok(Some(ValueData::TimestampSecondValue(ts.timestamp())))
166 }
167 _ => CoerceIncompatibleTypesSnafu {
168 msg: "Timestamp can only be coerced to another type",
169 }
170 .fail(),
171 },
172 VrlValue::Array(_) | VrlValue::Object(_) => coerce_json_value(val, transform),
173 VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
174 }
175}
176
177fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>> {
178 let val = match transform.type_ {
179 ColumnDataType::Int8 => ValueData::I8Value(b as i32),
180 ColumnDataType::Int16 => ValueData::I16Value(b as i32),
181 ColumnDataType::Int32 => ValueData::I32Value(b as i32),
182 ColumnDataType::Int64 => ValueData::I64Value(b as i64),
183
184 ColumnDataType::Uint8 => ValueData::U8Value(b as u32),
185 ColumnDataType::Uint16 => ValueData::U16Value(b as u32),
186 ColumnDataType::Uint32 => ValueData::U32Value(b as u32),
187 ColumnDataType::Uint64 => ValueData::U64Value(b as u64),
188
189 ColumnDataType::Float32 => ValueData::F32Value(if b { 1.0 } else { 0.0 }),
190 ColumnDataType::Float64 => ValueData::F64Value(if b { 1.0 } else { 0.0 }),
191
192 ColumnDataType::Boolean => ValueData::BoolValue(b),
193 ColumnDataType::String => ValueData::StringValue(b.to_string()),
194
195 ColumnDataType::TimestampNanosecond
196 | ColumnDataType::TimestampMicrosecond
197 | ColumnDataType::TimestampMillisecond
198 | ColumnDataType::TimestampSecond => match transform.on_failure {
199 Some(OnFailure::Ignore) => return Ok(None),
200 Some(OnFailure::Default) => {
201 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
202 }
203 None => {
204 return CoerceUnsupportedEpochTypeSnafu { ty: "Boolean" }.fail();
205 }
206 },
207
208 ColumnDataType::Binary => {
209 return CoerceJsonTypeToSnafu {
210 ty: transform.type_.as_str_name(),
211 }
212 .fail();
213 }
214
215 _ => {
216 return UnsupportedTypeInPipelineSnafu {
217 ty: transform.type_.as_str_name(),
218 }
219 .fail();
220 }
221 };
222
223 Ok(Some(val))
224}
225
226fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>> {
227 let val = match &transform.type_ {
228 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
229 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
230 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
231 ColumnDataType::Int64 => ValueData::I64Value(n),
232
233 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
234 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
235 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
236 ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
237
238 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
239 ColumnDataType::Float64 => ValueData::F64Value(n as f64),
240
241 ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
242 ColumnDataType::String => ValueData::StringValue(n.to_string()),
243
244 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n),
245 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n),
246 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n),
247 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n),
248
249 ColumnDataType::Binary => {
250 return CoerceJsonTypeToSnafu {
251 ty: transform.type_.as_str_name(),
252 }
253 .fail();
254 }
255
256 _ => return Ok(None),
257 };
258
259 Ok(Some(val))
260}
261
262fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>> {
263 let val = match &transform.type_ {
264 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
265 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
266 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
267 ColumnDataType::Int64 => ValueData::I64Value(n as i64),
268
269 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
270 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
271 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
272 ColumnDataType::Uint64 => ValueData::U64Value(n),
273
274 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
275 ColumnDataType::Float64 => ValueData::F64Value(n as f64),
276
277 ColumnDataType::Boolean => ValueData::BoolValue(n != 0),
278 ColumnDataType::String => ValueData::StringValue(n.to_string()),
279
280 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(n as i64),
281 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(n as i64),
282 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(n as i64),
283 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(n as i64),
284
285 ColumnDataType::Binary => {
286 return CoerceJsonTypeToSnafu {
287 ty: transform.type_.as_str_name(),
288 }
289 .fail();
290 }
291
292 _ => return Ok(None),
293 };
294
295 Ok(Some(val))
296}
297
298fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>> {
299 let val = match transform.type_ {
300 ColumnDataType::Int8 => ValueData::I8Value(n as i32),
301 ColumnDataType::Int16 => ValueData::I16Value(n as i32),
302 ColumnDataType::Int32 => ValueData::I32Value(n as i32),
303 ColumnDataType::Int64 => ValueData::I64Value(n as i64),
304
305 ColumnDataType::Uint8 => ValueData::U8Value(n as u32),
306 ColumnDataType::Uint16 => ValueData::U16Value(n as u32),
307 ColumnDataType::Uint32 => ValueData::U32Value(n as u32),
308 ColumnDataType::Uint64 => ValueData::U64Value(n as u64),
309
310 ColumnDataType::Float32 => ValueData::F32Value(n as f32),
311 ColumnDataType::Float64 => ValueData::F64Value(n),
312
313 ColumnDataType::Boolean => ValueData::BoolValue(n != 0.0),
314 ColumnDataType::String => ValueData::StringValue(n.to_string()),
315
316 ColumnDataType::TimestampNanosecond
317 | ColumnDataType::TimestampMicrosecond
318 | ColumnDataType::TimestampMillisecond
319 | ColumnDataType::TimestampSecond => match transform.on_failure {
320 Some(OnFailure::Ignore) => return Ok(None),
321 Some(OnFailure::Default) => {
322 return CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail();
323 }
324 None => {
325 return CoerceUnsupportedEpochTypeSnafu { ty: "Float" }.fail();
326 }
327 },
328
329 ColumnDataType::Binary => {
330 return CoerceJsonTypeToSnafu {
331 ty: transform.type_.as_str_name(),
332 }
333 .fail();
334 }
335
336 _ => return Ok(None),
337 };
338
339 Ok(Some(val))
340}
341
342macro_rules! coerce_string_value {
343 ($s:expr, $transform:expr, $type:ident, $parse:ident) => {
344 match $s.parse::<$type>() {
345 Ok(v) => Ok(Some(ValueData::$parse(v))),
346 Err(_) => match $transform.on_failure {
347 Some(OnFailure::Ignore) => Ok(None),
348 Some(OnFailure::Default) => match $transform.get_default() {
349 Some(default) => Ok(Some(default.clone())),
350 None => $transform.get_type_matched_default_val().map(Some),
351 },
352 None => CoerceStringToTypeSnafu {
353 s: $s,
354 ty: $transform.type_.as_str_name(),
355 }
356 .fail(),
357 },
358 }
359 };
360}
361
362fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>> {
363 match transform.type_ {
364 ColumnDataType::Int8 => {
365 coerce_string_value!(s, transform, i32, I8Value)
366 }
367 ColumnDataType::Int16 => {
368 coerce_string_value!(s, transform, i32, I16Value)
369 }
370 ColumnDataType::Int32 => {
371 coerce_string_value!(s, transform, i32, I32Value)
372 }
373 ColumnDataType::Int64 => {
374 coerce_string_value!(s, transform, i64, I64Value)
375 }
376
377 ColumnDataType::Uint8 => {
378 coerce_string_value!(s, transform, u32, U8Value)
379 }
380 ColumnDataType::Uint16 => {
381 coerce_string_value!(s, transform, u32, U16Value)
382 }
383 ColumnDataType::Uint32 => {
384 coerce_string_value!(s, transform, u32, U32Value)
385 }
386 ColumnDataType::Uint64 => {
387 coerce_string_value!(s, transform, u64, U64Value)
388 }
389
390 ColumnDataType::Float32 => {
391 coerce_string_value!(s, transform, f32, F32Value)
392 }
393 ColumnDataType::Float64 => {
394 coerce_string_value!(s, transform, f64, F64Value)
395 }
396
397 ColumnDataType::Boolean => {
398 coerce_string_value!(s, transform, bool, BoolValue)
399 }
400
401 ColumnDataType::String => Ok(Some(ValueData::StringValue(s.to_string()))),
402
403 ColumnDataType::TimestampNanosecond
404 | ColumnDataType::TimestampMicrosecond
405 | ColumnDataType::TimestampMillisecond
406 | ColumnDataType::TimestampSecond => match transform.on_failure {
407 Some(OnFailure::Ignore) => Ok(None),
408 Some(OnFailure::Default) => CoerceUnsupportedEpochTypeSnafu { ty: "Default" }.fail(),
409 None => CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(),
410 },
411
412 ColumnDataType::Binary => CoerceStringToTypeSnafu {
413 s,
414 ty: transform.type_.as_str_name(),
415 }
416 .fail(),
417
418 _ => Ok(None),
419 }
420}
421
422fn coerce_json_value(v: &VrlValue, transform: &Transform) -> Result<Option<ValueData>> {
423 match &transform.type_ {
424 ColumnDataType::Binary => (),
425 t => {
426 return CoerceTypeToJsonSnafu {
427 ty: t.as_str_name(),
428 }
429 .fail();
430 }
431 }
432 let data: jsonb::Value = vrl_value_to_jsonb_value(v);
433 Ok(Some(ValueData::BinaryValue(data.to_vec())))
434}
435
436#[cfg(test)]
437mod tests {
438
439 use datatypes::schema::{FulltextAnalyzer, FulltextBackend, SkippingIndexType};
440 use vrl::prelude::Bytes;
441
442 use super::*;
443 use crate::etl::field::Fields;
444
445 #[test]
446 fn test_coerce_string_without_on_failure() {
447 let transform = Transform {
448 fields: Fields::default(),
449 type_: ColumnDataType::Int32,
450 default: None,
451 index: None,
452 index_options: None,
453 on_failure: None,
454 tag: false,
455 };
456
457 {
459 let val = VrlValue::Integer(123);
460 let result = coerce_value(&val, &transform).unwrap();
461 assert_eq!(result, Some(ValueData::I32Value(123)));
462 }
463
464 {
466 let val = VrlValue::Bytes(Bytes::from("hello"));
467 let result = coerce_value(&val, &transform);
468 assert!(result.is_err());
469 }
470 }
471
472 #[test]
473 fn test_coerce_string_with_on_failure_ignore() {
474 let transform = Transform {
475 fields: Fields::default(),
476 type_: ColumnDataType::Int32,
477 default: None,
478 index: None,
479 index_options: None,
480 on_failure: Some(OnFailure::Ignore),
481 tag: false,
482 };
483
484 let val = VrlValue::Bytes(Bytes::from("hello"));
485 let result = coerce_value(&val, &transform).unwrap();
486 assert_eq!(result, None);
487 }
488
489 #[test]
490 fn test_coerce_string_with_on_failure_default() {
491 let mut transform = Transform {
492 fields: Fields::default(),
493 type_: ColumnDataType::Int32,
494 default: None,
495 index: None,
496 index_options: None,
497 on_failure: Some(OnFailure::Default),
498 tag: false,
499 };
500
501 {
503 let val = VrlValue::Bytes(Bytes::from("hello"));
504 let result = coerce_value(&val, &transform).unwrap();
505 assert_eq!(result, Some(ValueData::I32Value(0)));
506 }
507
508 {
510 transform.default = Some(ValueData::I32Value(42));
511 let val = VrlValue::Bytes(Bytes::from("hello"));
512 let result = coerce_value(&val, &transform).unwrap();
513 assert_eq!(result, Some(ValueData::I32Value(42)));
514 }
515 }
516
517 #[test]
518 fn test_coerce_fulltext_options_with_custom_values() {
519 let transform = Transform {
520 fields: Fields::default(),
521 type_: ColumnDataType::String,
522 default: None,
523 index: Some(Index::Fulltext),
524 index_options: Some(TransformIndexOptions::Fulltext(
525 FulltextOptions::new_unchecked(
526 true,
527 FulltextAnalyzer::Chinese,
528 true,
529 FulltextBackend::Tantivy,
530 10240,
531 0.01,
532 ),
533 )),
534 on_failure: None,
535 tag: false,
536 };
537
538 let options = coerce_options(&transform).unwrap().unwrap();
539 let fulltext: FulltextOptions =
540 serde_json::from_str(options.options.get("fulltext").unwrap()).unwrap();
541
542 assert!(fulltext.enable);
543 assert_eq!(fulltext.analyzer.to_string(), "Chinese");
544 assert!(fulltext.case_sensitive);
545 assert_eq!(fulltext.backend.to_string(), "tantivy");
546 }
547
548 #[test]
549 fn test_coerce_skipping_options_with_custom_values() {
550 let transform = Transform {
551 fields: Fields::default(),
552 type_: ColumnDataType::Int64,
553 default: None,
554 index: Some(Index::Skipping),
555 index_options: Some(TransformIndexOptions::Skipping(
556 SkippingIndexOptions::new_unchecked(2048, 0.02, SkippingIndexType::BloomFilter),
557 )),
558 on_failure: None,
559 tag: false,
560 };
561
562 let options = coerce_options(&transform).unwrap().unwrap();
563 let skipping: SkippingIndexOptions =
564 serde_json::from_str(options.options.get("skipping_index").unwrap()).unwrap();
565
566 assert_eq!(skipping.granularity, 2048);
567 assert_eq!(skipping.false_positive_rate(), 0.02);
568 assert_eq!(skipping.index_type.to_string(), "BLOOM");
569 }
570
571 #[test]
572 fn test_coerce_rejects_mismatched_index_options() {
573 let transform = Transform {
574 fields: Fields::default(),
575 type_: ColumnDataType::String,
576 default: None,
577 index: Some(Index::Fulltext),
578 index_options: Some(TransformIndexOptions::Skipping(
579 SkippingIndexOptions::new_unchecked(2048, 0.02, SkippingIndexType::BloomFilter),
580 )),
581 on_failure: None,
582 tag: false,
583 };
584
585 assert!(coerce_options(&transform).is_err());
586 }
587
588 #[test]
589 fn test_coerce_rejects_index_options_without_index() {
590 let transform = Transform {
591 fields: Fields::default(),
592 type_: ColumnDataType::String,
593 default: None,
594 index: None,
595 index_options: Some(TransformIndexOptions::Fulltext(
596 FulltextOptions::new_unchecked(
597 true,
598 FulltextAnalyzer::Chinese,
599 true,
600 FulltextBackend::Tantivy,
601 10240,
602 0.01,
603 ),
604 )),
605 on_failure: None,
606 tag: false,
607 };
608
609 assert!(coerce_options(&transform).is_err());
610 }
611}