1pub mod index;
16pub mod transformer;
17
18use std::collections::HashMap;
19
20use api::v1::ColumnDataType;
21use api::v1::value::ValueData;
22use chrono::Utc;
23use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
24use snafu::{OptionExt, ResultExt, ensure};
25use sql::parsers::utils::{
26 validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
27};
28
29use crate::error::{
30 Error, FieldMustBeTypeSnafu, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
31 TransformFieldMustBeSetSnafu, TransformIndexOptionMustBeScalarSnafu, TransformIndexOptionSnafu,
32 TransformIndexOptionUnsupportedSnafu, TransformIndexOptionsUnsupportedSnafu,
33 TransformIndexTypeMismatchSnafu, TransformIndexTypeMustBeSetSnafu,
34 TransformIndexUnsupportedFieldSnafu, TransformOnFailureInvalidValueSnafu,
35 TransformTypeMustBeSetSnafu, UnsupportedTypeInPipelineSnafu,
36};
37use crate::etl::field::Fields;
38use crate::etl::processor::{yaml_bool, yaml_new_field, yaml_new_fields, yaml_string};
39use crate::etl::transform::index::Index;
40use crate::etl::value::{parse_str_type, parse_str_value};
41
42const TRANSFORM_FIELD: &str = "field";
43const TRANSFORM_FIELDS: &str = "fields";
44const TRANSFORM_TYPE: &str = "type";
45const TRANSFORM_INDEX: &str = "index";
46const TRANSFORM_INDEX_TYPE_FIELD: &str = "index.type";
47const TRANSFORM_INDEX_OPTIONS: &str = "options";
48const TRANSFORM_INDEX_OPTIONS_FIELD: &str = "index.options";
49const TRANSFORM_TAG: &str = "tag";
50const TRANSFORM_DEFAULT: &str = "default";
51const TRANSFORM_ON_FAILURE: &str = "on_failure";
52
53pub use transformer::greptime::GreptimeTransformer;
54
55#[derive(Debug, Clone, Default, Copy)]
57pub enum OnFailure {
58 #[default]
60 Ignore,
61 Default,
64}
65
66impl std::str::FromStr for OnFailure {
67 type Err = Error;
68
69 fn from_str(s: &str) -> Result<Self> {
70 match s {
71 "ignore" => Ok(OnFailure::Ignore),
72 "default" => Ok(OnFailure::Default),
73 _ => TransformOnFailureInvalidValueSnafu { value: s }.fail(),
74 }
75 }
76}
77
78#[derive(Debug, Default, Clone)]
79pub struct Transforms {
80 pub(crate) transforms: Vec<Transform>,
81}
82
83impl Transforms {
84 pub fn transforms(&self) -> &Vec<Transform> {
85 &self.transforms
86 }
87}
88
89impl std::ops::Deref for Transforms {
90 type Target = Vec<Transform>;
91
92 fn deref(&self) -> &Self::Target {
93 &self.transforms
94 }
95}
96
97impl std::ops::DerefMut for Transforms {
98 fn deref_mut(&mut self) -> &mut Self::Target {
99 &mut self.transforms
100 }
101}
102
103impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
104 type Error = Error;
105
106 fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
107 let mut transforms = Vec::with_capacity(32);
108 let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
109 let mut all_required_keys = Vec::with_capacity(32);
110
111 for doc in docs {
112 let transform_builder: Transform = doc
113 .as_hash()
114 .context(TransformElementMustBeMapSnafu)?
115 .try_into()?;
116 let mut transform_output_keys = transform_builder
117 .fields
118 .iter()
119 .map(|f| f.target_or_input_field().to_string())
120 .collect();
121 all_output_keys.append(&mut transform_output_keys);
122
123 let mut transform_required_keys = transform_builder
124 .fields
125 .iter()
126 .map(|f| f.input_field().to_string())
127 .collect();
128 all_required_keys.append(&mut transform_required_keys);
129
130 transforms.push(transform_builder);
131 }
132
133 all_required_keys.sort();
134
135 Ok(Transforms { transforms })
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct Transform {
142 pub fields: Fields,
143 pub type_: ColumnDataType,
144 pub default: Option<ValueData>,
145 pub index: Option<Index>,
146 pub index_options: Option<TransformIndexOptions>,
147 pub tag: bool,
148 pub on_failure: Option<OnFailure>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum TransformIndexOptions {
153 Fulltext(FulltextOptions),
154 Skipping(SkippingIndexOptions),
155}
156
157impl TransformIndexOptions {
158 pub(crate) fn index(&self) -> Index {
159 match self {
160 TransformIndexOptions::Fulltext(_) => Index::Fulltext,
161 TransformIndexOptions::Skipping(_) => Index::Skipping,
162 }
163 }
164
165 #[cfg(test)]
166 pub(crate) fn as_fulltext(&self) -> Option<&FulltextOptions> {
167 match self {
168 TransformIndexOptions::Fulltext(options) => Some(options),
169 TransformIndexOptions::Skipping(_) => None,
170 }
171 }
172
173 #[cfg(test)]
174 pub(crate) fn as_skipping(&self) -> Option<&SkippingIndexOptions> {
175 match self {
176 TransformIndexOptions::Skipping(options) => Some(options),
177 TransformIndexOptions::Fulltext(_) => None,
178 }
179 }
180}
181
182impl Transform {
202 pub(crate) fn get_default(&self) -> Option<&ValueData> {
203 self.default.as_ref()
204 }
205
206 pub(crate) fn get_type_matched_default_val(&self) -> Result<ValueData> {
207 get_default_for_type(&self.type_)
208 }
209
210 pub(crate) fn get_default_value_when_data_is_none(&self) -> Option<ValueData> {
211 if is_timestamp_type(&self.type_) && self.index.is_some_and(|i| i == Index::Time) {
212 let now = Utc::now();
213 match self.type_ {
214 ColumnDataType::TimestampSecond => {
215 return Some(ValueData::TimestampSecondValue(now.timestamp()));
216 }
217 ColumnDataType::TimestampMillisecond => {
218 return Some(ValueData::TimestampMillisecondValue(now.timestamp_millis()));
219 }
220 ColumnDataType::TimestampMicrosecond => {
221 return Some(ValueData::TimestampMicrosecondValue(now.timestamp_micros()));
222 }
223 ColumnDataType::TimestampNanosecond => {
224 return Some(ValueData::TimestampNanosecondValue(
225 now.timestamp_nanos_opt()?,
226 ));
227 }
228 _ => {}
229 }
230 }
231 None
232 }
233
234 pub(crate) fn is_timeindex(&self) -> bool {
235 self.index.is_some_and(|i| i == Index::Time)
236 }
237}
238
239fn is_timestamp_type(ty: &ColumnDataType) -> bool {
240 matches!(
241 ty,
242 ColumnDataType::TimestampSecond
243 | ColumnDataType::TimestampMillisecond
244 | ColumnDataType::TimestampMicrosecond
245 | ColumnDataType::TimestampNanosecond
246 )
247}
248
249fn get_default_for_type(ty: &ColumnDataType) -> Result<ValueData> {
250 let v = match ty {
251 ColumnDataType::Boolean => ValueData::BoolValue(false),
252 ColumnDataType::Int8 => ValueData::I8Value(0),
253 ColumnDataType::Int16 => ValueData::I16Value(0),
254 ColumnDataType::Int32 => ValueData::I32Value(0),
255 ColumnDataType::Int64 => ValueData::I64Value(0),
256 ColumnDataType::Uint8 => ValueData::U8Value(0),
257 ColumnDataType::Uint16 => ValueData::U16Value(0),
258 ColumnDataType::Uint32 => ValueData::U32Value(0),
259 ColumnDataType::Uint64 => ValueData::U64Value(0),
260 ColumnDataType::Float32 => ValueData::F32Value(0.0),
261 ColumnDataType::Float64 => ValueData::F64Value(0.0),
262 ColumnDataType::Binary => ValueData::BinaryValue(jsonb::Value::Null.to_vec()),
263 ColumnDataType::String => ValueData::StringValue(String::new()),
264
265 ColumnDataType::TimestampSecond => ValueData::TimestampSecondValue(0),
266 ColumnDataType::TimestampMillisecond => ValueData::TimestampMillisecondValue(0),
267 ColumnDataType::TimestampMicrosecond => ValueData::TimestampMicrosecondValue(0),
268 ColumnDataType::TimestampNanosecond => ValueData::TimestampNanosecondValue(0),
269
270 _ => UnsupportedTypeInPipelineSnafu {
271 ty: ty.as_str_name(),
272 }
273 .fail()?,
274 };
275 Ok(v)
276}
277
278fn parse_transform_index(
279 value: &yaml_rust::Yaml,
280) -> Result<(Index, Option<HashMap<String, String>>)> {
281 match value {
282 yaml_rust::Yaml::String(_) => {
283 let index_str = yaml_string(value, TRANSFORM_INDEX)?;
284 Ok((index_str.try_into()?, None))
285 }
286 yaml_rust::Yaml::Hash(hash) => {
287 let mut index = None;
288 let mut index_options = None;
289
290 for (k, v) in hash {
291 let key = k
292 .as_str()
293 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
294 match key {
295 TRANSFORM_TYPE => {
296 let index_str = yaml_string(v, TRANSFORM_INDEX_TYPE_FIELD)?;
297 index = Some(index_str.try_into()?);
298 }
299 TRANSFORM_INDEX_OPTIONS => {
300 index_options = Some(parse_transform_index_options(v)?);
301 }
302 _ => {
303 return TransformIndexUnsupportedFieldSnafu {
304 field: key.to_string(),
305 }
306 .fail();
307 }
308 }
309 }
310
311 Ok((
312 index.context(TransformIndexTypeMustBeSetSnafu)?,
313 index_options,
314 ))
315 }
316 _ => FieldMustBeTypeSnafu {
317 field: TRANSFORM_INDEX,
318 ty: "string or map",
319 }
320 .fail(),
321 }
322}
323
324fn parse_transform_index_options(value: &yaml_rust::Yaml) -> Result<HashMap<String, String>> {
325 let hash = value.as_hash().context(FieldMustBeTypeSnafu {
326 field: TRANSFORM_INDEX_OPTIONS_FIELD,
327 ty: "map",
328 })?;
329 let mut options = HashMap::with_capacity(hash.len());
330
331 for (k, v) in hash {
332 let key = k
333 .as_str()
334 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
335
336 let field = format!("{TRANSFORM_INDEX_OPTIONS_FIELD}.{key}");
337 let value = match v {
338 yaml_rust::Yaml::String(v) => v.clone(),
339 yaml_rust::Yaml::Boolean(v) => v.to_string(),
340 yaml_rust::Yaml::Integer(v) => v.to_string(),
341 yaml_rust::Yaml::Real(v) => v.clone(),
342 _ => {
343 return TransformIndexOptionMustBeScalarSnafu { field }.fail();
344 }
345 };
346 options.insert(key.to_string(), value);
347 }
348
349 Ok(options)
350}
351
352fn lower_typed_transform_index_options<T>(
353 index: Index,
354 index_options: Option<HashMap<String, String>>,
355 validate: fn(&str) -> bool,
356 wrap: fn(T) -> TransformIndexOptions,
357) -> Result<Option<TransformIndexOptions>>
358where
359 T: TryFrom<HashMap<String, String>, Error = datatypes::error::Error>,
360{
361 index_options
362 .map(|opts| {
363 for key in opts.keys() {
364 ensure!(
365 validate(key),
366 TransformIndexOptionUnsupportedSnafu {
367 index: index.to_string(),
368 key: key.clone(),
369 }
370 );
371 }
372
373 let options = opts.try_into().context(TransformIndexOptionSnafu {
374 index: index.to_string(),
375 })?;
376
377 Ok(wrap(options))
378 })
379 .transpose()
380}
381
382fn lower_transform_index_options(
383 index: Index,
384 column_type: &ColumnDataType,
385 index_options: Option<HashMap<String, String>>,
386) -> Result<Option<TransformIndexOptions>> {
387 match index {
388 Index::Fulltext => {
389 ensure!(
390 *column_type == ColumnDataType::String,
391 TransformIndexTypeMismatchSnafu {
392 index: index.to_string(),
393 expected: ColumnDataType::String.as_str_name().to_string(),
394 actual: column_type.as_str_name().to_string(),
395 }
396 );
397
398 lower_typed_transform_index_options(
399 index,
400 index_options,
401 validate_column_fulltext_create_option,
402 TransformIndexOptions::Fulltext,
403 )
404 }
405 Index::Skipping => lower_typed_transform_index_options(
406 index,
407 index_options,
408 validate_column_skipping_index_create_option,
409 TransformIndexOptions::Skipping,
410 ),
411 Index::Inverted | Index::Time | Index::Tag => {
412 ensure!(
413 index_options.is_none(),
414 TransformIndexOptionsUnsupportedSnafu {
415 index: index.to_string(),
416 }
417 );
418 Ok(None)
419 }
420 }
421}
422impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
423 type Error = Error;
424
425 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
426 let mut fields = Fields::default();
427 let mut default = None;
428 let mut index_value = None;
429 let mut tag = false;
430 let mut on_failure = None;
431
432 let mut type_ = None;
433
434 for (k, v) in hash {
435 let key = k
436 .as_str()
437 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
438 match key {
439 TRANSFORM_FIELD => {
440 fields = Fields::one(yaml_new_field(v, TRANSFORM_FIELD)?);
441 }
442
443 TRANSFORM_FIELDS => {
444 fields = yaml_new_fields(v, TRANSFORM_FIELDS)?;
445 }
446
447 TRANSFORM_TYPE => {
448 let t = yaml_string(v, TRANSFORM_TYPE)?;
449 type_ = Some(parse_str_type(&t)?);
450 }
451
452 TRANSFORM_INDEX => {
453 index_value = Some(v);
454 }
455
456 TRANSFORM_TAG => {
457 tag = yaml_bool(v, TRANSFORM_TAG)?;
458 }
459
460 TRANSFORM_DEFAULT => {
461 default = match v {
462 yaml_rust::Yaml::Real(r) => Some(r.clone()),
463 yaml_rust::Yaml::Integer(i) => Some(i.to_string()),
464 yaml_rust::Yaml::String(s) => Some(s.clone()),
465 yaml_rust::Yaml::Boolean(b) => Some(b.to_string()),
466 yaml_rust::Yaml::Array(_)
467 | yaml_rust::Yaml::Hash(_)
468 | yaml_rust::Yaml::Alias(_)
469 | yaml_rust::Yaml::Null
470 | yaml_rust::Yaml::BadValue => None,
471 };
472 }
473
474 TRANSFORM_ON_FAILURE => {
475 let on_failure_str = yaml_string(v, TRANSFORM_ON_FAILURE)?;
476 on_failure = Some(on_failure_str.parse()?);
477 }
478
479 _ => {}
480 }
481 }
482
483 ensure!(!fields.is_empty(), TransformFieldMustBeSetSnafu);
485 let type_ = type_.context(TransformTypeMustBeSetSnafu {
486 fields: format!("{:?}", fields),
487 })?;
488
489 let (index, index_options) = match index_value {
490 Some(value) => {
491 let (index, raw_index_options) = parse_transform_index(value)?;
492 let index_options =
493 lower_transform_index_options(index, &type_, raw_index_options)?;
494 (Some(index), index_options)
495 }
496 None => (None, None),
497 };
498
499 let final_default = if let Some(default_value) = default {
500 let target = parse_str_value(&type_, &default_value)?;
501 on_failure = Some(OnFailure::Default);
502 Some(target)
503 } else {
504 None
505 };
506
507 let builder = Transform {
508 fields,
509 type_,
510 default: final_default,
511 index,
512 index_options,
513 on_failure,
514 tag,
515 };
516
517 Ok(builder)
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use yaml_rust::YamlLoader;
524
525 use super::*;
526
527 fn parse_transform(yaml: &str) -> Result<Transform> {
528 let docs = YamlLoader::load_from_str(yaml).unwrap();
529 docs[0].as_hash().unwrap().try_into()
530 }
531
532 #[test]
533 fn test_transform_parses_legacy_string_index() {
534 let transform = parse_transform(
535 r#"
536field: message
537type: string
538index: fulltext
539"#,
540 )
541 .unwrap();
542
543 assert_eq!(transform.index, Some(Index::Fulltext));
544 assert!(transform.index_options.is_none());
545 }
546
547 #[test]
548 fn test_transform_parses_index_object_without_options() {
549 let transform = parse_transform(
550 r#"
551field: message
552type: string
553index:
554 type: inverted
555"#,
556 )
557 .unwrap();
558
559 assert_eq!(transform.index, Some(Index::Inverted));
560 assert!(transform.index_options.is_none());
561 }
562
563 #[test]
564 fn test_transform_parses_index_object_with_scalar_options() {
565 let transform = parse_transform(
566 r#"
567field: message
568type: string
569index:
570 type: fulltext
571 options:
572 analyzer: English
573 case_sensitive: false
574 granularity: 2048
575 false_positive_rate: 0.02
576"#,
577 )
578 .unwrap();
579
580 assert_eq!(transform.index, Some(Index::Fulltext));
581 let options = transform.index_options.as_ref().unwrap();
582 let fulltext = options.as_fulltext().unwrap();
583 assert!(fulltext.enable);
584 assert_eq!(fulltext.analyzer.to_string(), "English");
585 assert!(!fulltext.case_sensitive);
586 assert_eq!(fulltext.granularity, 2048);
587 assert_eq!(fulltext.false_positive_rate(), 0.02);
588 }
589
590 #[test]
591 fn test_transform_rejects_invalid_index_options_type() {
592 let result = parse_transform(
593 r#"
594field: message
595type: string
596index:
597 type: fulltext
598 options: invalid
599"#,
600 );
601
602 assert!(result.is_err());
603 }
604
605 #[test]
606 fn test_transform_rejects_non_scalar_index_option_value() {
607 let result = parse_transform(
608 r#"
609field: message
610type: string
611index:
612 type: fulltext
613 options:
614 analyzer:
615 kind: English
616"#,
617 );
618
619 assert!(result.is_err());
620 }
621
622 #[test]
623 fn test_transform_rejects_unknown_index_field() {
624 let result = parse_transform(
625 r#"
626field: message
627type: string
628index:
629 type: fulltext
630 config: {}
631"#,
632 );
633
634 assert!(result.is_err());
635 }
636
637 #[test]
638 fn test_transform_rejects_unsupported_fulltext_option_key() {
639 let result = parse_transform(
640 r#"
641field: message
642type: string
643index:
644 type: fulltext
645 options:
646 tokenizer: english
647"#,
648 );
649
650 assert!(result.is_err());
651 }
652
653 #[test]
654 fn test_transform_rejects_options_for_inverted_index() {
655 let result = parse_transform(
656 r#"
657field: message
658type: string
659index:
660 type: inverted
661 options:
662 backend: bloom
663"#,
664 );
665
666 assert!(result.is_err());
667 }
668
669 #[test]
670 fn test_transform_rejects_empty_options_for_unsupported_indexes() {
671 for index in ["inverted", "time", "tag"] {
672 let yaml = format!(
673 r#"
674field: message
675type: string
676index:
677 type: {index}
678 options: {{}}
679"#
680 );
681
682 let result = parse_transform(&yaml);
683 assert!(
684 result.is_err(),
685 "expected `{index}` to reject empty options"
686 );
687 }
688 }
689
690 #[test]
691 fn test_transform_rejects_fulltext_index_on_non_string_column() {
692 let result = parse_transform(
693 r#"
694field: count
695type: int64
696index: fulltext
697"#,
698 );
699
700 assert!(result.is_err());
701 }
702
703 #[test]
704 fn test_transform_allows_skipping_index_on_numeric_column() {
705 let transform = parse_transform(
706 r#"
707field: count
708type: int64
709index:
710 type: skipping
711 options:
712 granularity: 2048
713 false_positive_rate: 0.02
714 type: BLOOM
715"#,
716 )
717 .unwrap();
718
719 assert_eq!(transform.index, Some(Index::Skipping));
720 let skipping = transform
721 .index_options
722 .as_ref()
723 .unwrap()
724 .as_skipping()
725 .unwrap();
726 assert_eq!(skipping.granularity, 2048);
727 assert_eq!(skipping.false_positive_rate(), 0.02);
728 assert_eq!(skipping.index_type.to_string(), "BLOOM");
729 }
730}