pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::HashMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{OptionExt, ResultExt, ensure};
29use transform::Transforms;
30use vrl::core::Value as VrlValue;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35    ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error,
36    IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu,
37    YamlLoadSnafu, YamlParseSnafu,
38};
39use crate::etl::processor::ProcessorKind;
40use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows};
41use crate::tablesuffix::TableSuffixTemplate;
42use crate::{
43    ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo,
44    unwrap_or_continue_if_err,
45};
46
47const DESCRIPTION: &str = "description";
48const DOC_VERSION: &str = "version";
49const PROCESSORS: &str = "processors";
50const TRANSFORM: &str = "transform";
51const TRANSFORMS: &str = "transforms";
52const DISPATCHER: &str = "dispatcher";
53const TABLESUFFIX: &str = "table_suffix";
54
55pub enum Content<'a> {
56    Json(&'a str),
57    Yaml(&'a str),
58}
59
60pub fn parse(input: &Content) -> Result<Pipeline> {
61    match input {
62        Content::Yaml(str) => {
63            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
64
65            ensure!(docs.len() == 1, YamlParseSnafu);
66
67            let doc = &docs[0];
68
69            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
70
71            let doc_version = (&doc[DOC_VERSION]).try_into()?;
72
73            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
74                v.try_into()?
75            } else {
76                Processors::default()
77            };
78
79            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
80            {
81                v.try_into()?
82            } else {
83                Transforms::default()
84            };
85
86            let transformer = if transformers.is_empty() {
87                // use auto transform
88                // check processors have at least one timestamp-related processor
89                let cnt = processors
90                    .iter()
91                    .filter_map(|p| match p {
92                        ProcessorKind::Date(d) if !d.ignore_missing() => Some(
93                            d.fields
94                                .iter()
95                                .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
96                                .collect_vec(),
97                        ),
98                        ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
99                            e.fields
100                                .iter()
101                                .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
102                                .collect_vec(),
103                        ),
104                        _ => None,
105                    })
106                    .flatten()
107                    .collect_vec();
108                ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
109
110                let (ts_name, timeunit) = cnt.first().unwrap();
111                TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
112            } else {
113                TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
114                    transformers,
115                    &doc_version,
116                )?)
117            };
118
119            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
120                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
121            } else {
122                None
123            };
124
125            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
126                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
127            } else {
128                None
129            };
130
131            Ok(Pipeline {
132                doc_version,
133                description,
134                processors,
135                transformer,
136                dispatcher,
137                tablesuffix,
138            })
139        }
140        Content::Json(_) => unimplemented!(),
141    }
142}
143
144#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
145pub enum PipelineDocVersion {
146    /// 1. All fields meant to be preserved have to explicitly set in the transform section.
147    /// 2. Or no transform is set, then the auto-transform will be used.
148    #[default]
149    V1,
150
151    /// A combination of transform and auto-transform.
152    /// First it goes through the transform section,
153    /// then use auto-transform to set the rest fields.
154    ///
155    /// This is useful if you only want to set the index field,
156    /// and let the normal fields be auto-inferred.
157    V2,
158}
159
160impl TryFrom<&Yaml> for PipelineDocVersion {
161    type Error = Error;
162
163    fn try_from(value: &Yaml) -> Result<Self> {
164        if value.is_badvalue() || value.is_null() {
165            return Ok(PipelineDocVersion::V1);
166        }
167
168        let version = match value {
169            Yaml::String(s) => s
170                .parse::<i64>()
171                .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
172            Yaml::Integer(i) => *i,
173            _ => {
174                return InvalidVersionNumberSnafu {
175                    version: value.as_str().unwrap_or_default().to_string(),
176                }
177                .fail();
178            }
179        };
180
181        match version {
182            1 => Ok(PipelineDocVersion::V1),
183            2 => Ok(PipelineDocVersion::V2),
184            _ => InvalidVersionNumberSnafu {
185                version: version.to_string(),
186            }
187            .fail(),
188        }
189    }
190}
191
192#[derive(Debug)]
193pub struct Pipeline {
194    doc_version: PipelineDocVersion,
195    description: Option<String>,
196    processors: processor::Processors,
197    dispatcher: Option<Dispatcher>,
198    transformer: TransformerMode,
199    tablesuffix: Option<TableSuffixTemplate>,
200}
201
202#[derive(Debug, Clone)]
203pub enum TransformerMode {
204    GreptimeTransformer(GreptimeTransformer),
205    AutoTransform(String, TimeUnit),
206}
207
208/// Where the pipeline executed is dispatched to, with context information
209#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
210pub struct DispatchedTo {
211    pub table_suffix: String,
212    pub pipeline: Option<String>,
213}
214
215impl From<&Rule> for DispatchedTo {
216    fn from(value: &Rule) -> Self {
217        DispatchedTo {
218            table_suffix: value.table_suffix.clone(),
219            pipeline: value.pipeline.clone(),
220        }
221    }
222}
223
224impl DispatchedTo {
225    /// Generate destination table name from input
226    pub fn dispatched_to_table_name(&self, original: &str) -> String {
227        [original, &self.table_suffix].concat()
228    }
229}
230
231/// The result of pipeline execution
232#[derive(Debug)]
233pub enum PipelineExecOutput {
234    Transformed(TransformedOutput),
235    DispatchedTo(DispatchedTo, VrlValue),
236    Filtered,
237}
238
239/// Output from a successful pipeline transformation.
240///
241/// Rows are grouped by their ContextOpt, with each row having its own optional
242/// table_suffix for routing to different tables when using one-to-many expansion.
243/// This enables true per-row configuration options where different rows can have
244/// different database settings (TTL, merge mode, etc.).
245#[derive(Debug)]
246pub struct TransformedOutput {
247    /// Rows grouped by their ContextOpt, each with optional table suffix
248    pub rows_by_context: HashMap<ContextOpt, Vec<RowWithTableSuffix>>,
249}
250
251impl PipelineExecOutput {
252    // Note: This is a test only function, do not use it in production.
253    pub fn into_transformed(self) -> Option<Vec<RowWithTableSuffix>> {
254        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
255            // For backward compatibility, merge all rows with a default ContextOpt
256            Some(rows_by_context.into_values().flatten().collect())
257        } else {
258            None
259        }
260    }
261
262    // New method for accessing the HashMap structure directly
263    pub fn into_transformed_hashmap(self) -> Option<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
264        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
265            Some(rows_by_context)
266        } else {
267            None
268        }
269    }
270
271    // Backward compatibility helper that returns first ContextOpt with all its rows
272    // or merges all rows with default ContextOpt for multi-context scenarios
273    pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec<RowWithTableSuffix>)> {
274        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
275            if rows_by_context.len() == 1 {
276                let (opt, rows) = rows_by_context.into_iter().next().unwrap();
277                Some((opt, rows))
278            } else {
279                // Multiple contexts: merge all rows with default ContextOpt for test compatibility
280                let all_rows: Vec<RowWithTableSuffix> =
281                    rows_by_context.into_values().flatten().collect();
282                Some((ContextOpt::default(), all_rows))
283            }
284        } else {
285            None
286        }
287    }
288
289    // Note: This is a test only function, do not use it in production.
290    pub fn into_dispatched(self) -> Option<DispatchedTo> {
291        if let Self::DispatchedTo(d, _) = self {
292            Some(d)
293        } else {
294            None
295        }
296    }
297}
298
299impl Pipeline {
300    fn is_v1(&self) -> bool {
301        self.doc_version == PipelineDocVersion::V1
302    }
303
304    pub fn exec_mut(
305        &self,
306        mut val: VrlValue,
307        pipeline_ctx: &PipelineContext<'_>,
308        schema_info: &mut SchemaInfo,
309    ) -> Result<PipelineExecOutput> {
310        // process
311        for processor in self.processors.iter() {
312            val = processor.exec_mut(val)?;
313            if val.is_null() {
314                // line is filtered
315                return Ok(PipelineExecOutput::Filtered);
316            }
317        }
318
319        // dispatch, fast return if matched
320        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
321            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
322        }
323
324        let mut val = if val.is_array() {
325            val
326        } else {
327            VrlValue::Array(vec![val])
328        };
329
330        let rows_by_context = match self.transformer() {
331            TransformerMode::GreptimeTransformer(greptime_transformer) => {
332                transform_array_elements_by_ctx(
333                    // SAFETY: by line 326, val must be an array
334                    val.as_array_mut().unwrap(),
335                    greptime_transformer,
336                    self.is_v1(),
337                    schema_info,
338                    pipeline_ctx,
339                    self.tablesuffix.as_ref(),
340                )?
341            }
342            TransformerMode::AutoTransform(ts_name, time_unit) => {
343                let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
344                    IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false),
345                ));
346                let n_ctx =
347                    PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
348                values_to_rows(
349                    schema_info,
350                    val,
351                    &n_ctx,
352                    None,
353                    true,
354                    self.tablesuffix.as_ref(),
355                )?
356            }
357        };
358
359        Ok(PipelineExecOutput::Transformed(TransformedOutput {
360            rows_by_context,
361        }))
362    }
363
364    pub fn processors(&self) -> &processor::Processors {
365        &self.processors
366    }
367
368    pub fn transformer(&self) -> &TransformerMode {
369        &self.transformer
370    }
371
372    // the method is for test purpose
373    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
374        match &self.transformer {
375            TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
376            TransformerMode::AutoTransform(_, _) => None,
377        }
378    }
379
380    pub fn is_variant_table_name(&self) -> bool {
381        // even if the pipeline doesn't have dispatcher or table_suffix,
382        // it can still be a variant because of VRL processor and hint
383        self.dispatcher.is_some() || self.tablesuffix.is_some()
384    }
385}
386
387/// Transforms an array of VRL values into rows grouped by their ContextOpt.
388/// Each element can have its own ContextOpt for per-row configuration.
389fn transform_array_elements_by_ctx(
390    arr: &mut [VrlValue],
391    transformer: &GreptimeTransformer,
392    is_v1: bool,
393    schema_info: &mut SchemaInfo,
394    pipeline_ctx: &PipelineContext<'_>,
395    tablesuffix_template: Option<&TableSuffixTemplate>,
396) -> Result<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
397    let skip_error = pipeline_ctx.pipeline_param.skip_error();
398    let mut rows_by_context = HashMap::new();
399
400    for (index, element) in arr.iter_mut().enumerate() {
401        if !element.is_object() {
402            unwrap_or_continue_if_err!(
403                ArrayElementMustBeObjectSnafu {
404                    index,
405                    actual_type: element.kind_str().to_string(),
406                }
407                .fail(),
408                skip_error
409            );
410        }
411
412        let values =
413            unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error);
414        if is_v1 {
415            // v1 mode: just use transformer output directly
416            let mut opt = unwrap_or_continue_if_err!(
417                ContextOpt::from_pipeline_map_to_opt(element),
418                skip_error
419            );
420            let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element);
421            rows_by_context
422                .entry(opt)
423                .or_insert_with(Vec::new)
424                .push((Row { values }, table_suffix));
425        } else {
426            // v2 mode: combine with auto-transform for remaining fields
427            let element_rows_map = values_to_rows(
428                schema_info,
429                element.clone(),
430                pipeline_ctx,
431                Some(values),
432                false,
433                tablesuffix_template,
434            )
435            .map_err(Box::new)
436            .context(TransformArrayElementSnafu { index })?;
437            for (k, v) in element_rows_map {
438                rows_by_context.entry(k).or_default().extend(v);
439            }
440        }
441    }
442
443    Ok(rows_by_context)
444}
445
446pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
447    intermediate_keys
448        .iter()
449        .position(|k| k == key)
450        .context(IntermediateKeyIndexSnafu { kind, key })
451}
452
453/// This macro is test only, do not use it in production.
454/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
455///
456/// Usage:
457/// ```ignore
458/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
459/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
460/// ```
461#[macro_export]
462macro_rules! setup_pipeline {
463    ($pipeline:expr) => {{
464        use std::sync::Arc;
465
466        use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
467
468        let pipeline: Arc<Pipeline> = Arc::new($pipeline);
469        let schema = pipeline.schemas().unwrap();
470        let schema_info = SchemaInfo::from_schema_list(schema.clone());
471
472        let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
473        let pipeline_param = GreptimePipelineParams::default();
474
475        (pipeline, schema_info, pipeline_def, pipeline_param)
476    }};
477}
478
479#[cfg(test)]
480mod tests {
481    use std::collections::BTreeMap;
482    use std::sync::Arc;
483
484    use api::v1::Rows;
485    use greptime_proto::v1::value::ValueData;
486    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
487    use vrl::prelude::Bytes;
488    use vrl::value::KeyString;
489
490    use super::*;
491
492    #[test]
493    fn test_pipeline_prepare() {
494        let input_value_str = r#"
495                    {
496                        "my_field": "1,2",
497                        "foo": "bar",
498                        "ts": "1"
499                    }
500                "#;
501        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
502
503        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
504processors:
505    - csv:
506        field: my_field
507        target_fields: field1, field2
508    - epoch:
509        field: ts
510        resolution: ns
511transform:
512    - field: field1
513      type: uint32
514    - field: field2
515      type: uint32
516    - field: ts
517      type: timestamp, ns
518      index: time
519    "#;
520
521        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
522        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
523        let pipeline_ctx = PipelineContext::new(
524            &pipeline_def,
525            &pipeline_param,
526            session::context::Channel::Unknown,
527        );
528
529        let payload = input_value.into();
530        let mut result = pipeline
531            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
532            .unwrap()
533            .into_transformed()
534            .unwrap();
535
536        let (row, _table_suffix) = result.swap_remove(0);
537        assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1)));
538        assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2)));
539        match &row.values[2].value_data {
540            Some(ValueData::TimestampNanosecondValue(v)) => {
541                assert_ne!(v, &0);
542            }
543            _ => panic!("expect null value"),
544        }
545    }
546
547    #[test]
548    fn test_dissect_pipeline() {
549        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
550        let pipeline_str = r#"processors:
551    - dissect:
552        fields:
553          - message
554        patterns:
555          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
556    - date:
557        fields:
558          - ts
559        formats:
560          - "%d/%b/%Y:%H:%M:%S %z"
561
562transform:
563    - fields:
564        - ip
565        - username
566        - method
567        - path
568        - proto
569      type: string
570    - fields:
571        - status
572      type: uint16
573    - fields:
574        - bytes
575      type: uint32
576    - field: ts
577      type: timestamp, ns
578      index: time"#;
579        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
580        let pipeline = Arc::new(pipeline);
581        let schema = pipeline.schemas().unwrap();
582        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
583
584        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
585        let pipeline_param = crate::GreptimePipelineParams::default();
586        let pipeline_ctx = PipelineContext::new(
587            &pipeline_def,
588            &pipeline_param,
589            session::context::Channel::Unknown,
590        );
591        let payload = VrlValue::Object(BTreeMap::from([(
592            KeyString::from("message"),
593            VrlValue::Bytes(Bytes::from(message)),
594        )]));
595
596        let result = pipeline
597            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
598            .unwrap()
599            .into_transformed()
600            .unwrap();
601
602        assert_eq!(schema_info.schema.len(), result[0].0.values.len());
603        let test = [
604            (
605                ColumnDataType::String as i32,
606                Some(ValueData::StringValue("129.37.245.88".into())),
607            ),
608            (
609                ColumnDataType::String as i32,
610                Some(ValueData::StringValue("meln1ks".into())),
611            ),
612            (
613                ColumnDataType::String as i32,
614                Some(ValueData::StringValue("PATCH".into())),
615            ),
616            (
617                ColumnDataType::String as i32,
618                Some(ValueData::StringValue(
619                    "/observability/metrics/production".into(),
620                )),
621            ),
622            (
623                ColumnDataType::String as i32,
624                Some(ValueData::StringValue("HTTP/1.0".into())),
625            ),
626            (
627                ColumnDataType::Uint16 as i32,
628                Some(ValueData::U16Value(501)),
629            ),
630            (
631                ColumnDataType::Uint32 as i32,
632                Some(ValueData::U32Value(33085)),
633            ),
634            (
635                ColumnDataType::TimestampNanosecond as i32,
636                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
637            ),
638        ];
639        // manually set schema
640        let schema = pipeline.schemas().unwrap();
641        for i in 0..schema.len() {
642            let schema = &schema[i];
643            let value = &result[0].0.values[i];
644            assert_eq!(schema.datatype, test[i].0);
645            assert_eq!(value.value_data, test[i].1);
646        }
647    }
648
649    #[test]
650    fn test_csv_pipeline() {
651        let input_value_str = r#"
652                    {
653                        "my_field": "1,2",
654                        "foo": "bar",
655                        "ts": "1"
656                    }
657                "#;
658        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
659
660        let pipeline_yaml = r#"
661    description: Pipeline for Apache Tomcat
662    processors:
663      - csv:
664          field: my_field
665          target_fields: field1, field2
666      - epoch:
667          field: ts
668          resolution: ns
669    transform:
670      - field: field1
671        type: uint32
672      - field: field2
673        type: uint32
674      - field: ts
675        type: timestamp, ns
676        index: time
677    "#;
678
679        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
680        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
681        let pipeline_ctx = PipelineContext::new(
682            &pipeline_def,
683            &pipeline_param,
684            session::context::Channel::Unknown,
685        );
686
687        let payload = input_value.into();
688        let result = pipeline
689            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
690            .unwrap()
691            .into_transformed()
692            .unwrap();
693        assert_eq!(
694            result[0].0.values[0].value_data,
695            Some(ValueData::U32Value(1))
696        );
697        assert_eq!(
698            result[0].0.values[1].value_data,
699            Some(ValueData::U32Value(2))
700        );
701        match &result[0].0.values[2].value_data {
702            Some(ValueData::TimestampNanosecondValue(v)) => {
703                assert_ne!(v, &0);
704            }
705            _ => panic!("expect null value"),
706        }
707    }
708
709    #[test]
710    fn test_date_pipeline() {
711        let input_value_str = r#"
712                {
713                    "my_field": "1,2",
714                    "foo": "bar",
715                    "test_time": "2014-5-17T04:34:56+00:00"
716                }
717            "#;
718        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
719
720        let pipeline_yaml = r#"---
721description: Pipeline for Apache Tomcat
722
723processors:
724    - date:
725        field: test_time
726
727transform:
728    - field: test_time
729      type: timestamp, ns
730      index: time
731    "#;
732
733        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
734        let pipeline = Arc::new(pipeline);
735        let schema = pipeline.schemas().unwrap();
736        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
737
738        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
739        let pipeline_param = crate::GreptimePipelineParams::default();
740        let pipeline_ctx = PipelineContext::new(
741            &pipeline_def,
742            &pipeline_param,
743            session::context::Channel::Unknown,
744        );
745        let schema = pipeline.schemas().unwrap().clone();
746        let result = input_value.into();
747
748        let rows_with_suffix = pipeline
749            .exec_mut(result, &pipeline_ctx, &mut schema_info)
750            .unwrap()
751            .into_transformed()
752            .unwrap();
753        let output = Rows {
754            schema,
755            rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
756        };
757        let schemas = output.schema;
758
759        assert_eq!(schemas.len(), 1);
760        let schema = schemas[0].clone();
761        assert_eq!("test_time", schema.column_name);
762        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
763        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
764
765        let row = output.rows[0].clone();
766        assert_eq!(1, row.values.len());
767        let value_data = row.values[0].clone().value_data;
768        assert_eq!(
769            Some(v1::value::ValueData::TimestampNanosecondValue(
770                1400301296000000000
771            )),
772            value_data
773        );
774    }
775
776    #[test]
777    fn test_dispatcher() {
778        let pipeline_yaml = r#"
779---
780description: Pipeline for Apache Tomcat
781
782processors:
783  - epoch:
784      field: ts
785      resolution: ns
786
787dispatcher:
788  field: typename
789  rules:
790    - value: http
791      table_suffix: http_events
792    - value: database
793      table_suffix: db_events
794      pipeline: database_pipeline
795
796transform:
797  - field: typename
798    type: string
799  - field: ts
800    type: timestamp, ns
801    index: time
802"#;
803        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
804        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
805        assert_eq!(dispatcher.field, "typename");
806
807        assert_eq!(dispatcher.rules.len(), 2);
808
809        assert_eq!(
810            dispatcher.rules[0],
811            crate::dispatcher::Rule {
812                value: VrlValue::Bytes(Bytes::from("http")),
813                table_suffix: "http_events".to_string(),
814                pipeline: None
815            }
816        );
817
818        assert_eq!(
819            dispatcher.rules[1],
820            crate::dispatcher::Rule {
821                value: VrlValue::Bytes(Bytes::from("database")),
822                table_suffix: "db_events".to_string(),
823                pipeline: Some("database_pipeline".to_string()),
824            }
825        );
826
827        let bad_yaml1 = r#"
828---
829description: Pipeline for Apache Tomcat
830
831processors:
832  - epoch:
833      field: ts
834      resolution: ns
835
836dispatcher:
837  _field: typename
838  rules:
839    - value: http
840      table_suffix: http_events
841    - value: database
842      table_suffix: db_events
843      pipeline: database_pipeline
844
845transform:
846  - field: typename
847    type: string
848  - field: ts
849    type: timestamp, ns
850    index: time
851"#;
852        let bad_yaml2 = r#"
853---
854description: Pipeline for Apache Tomcat
855
856processors:
857  - epoch:
858      field: ts
859      resolution: ns
860dispatcher:
861  field: typename
862  rules:
863    - value: http
864      _table_suffix: http_events
865    - value: database
866      _table_suffix: db_events
867      pipeline: database_pipeline
868
869transform:
870  - field: typename
871    type: string
872  - field: ts
873    type: timestamp, ns
874    index: time
875"#;
876        let bad_yaml3 = r#"
877---
878description: Pipeline for Apache Tomcat
879
880processors:
881  - epoch:
882      field: ts
883      resolution: ns
884dispatcher:
885  field: typename
886  rules:
887    - _value: http
888      table_suffix: http_events
889    - _value: database
890      table_suffix: db_events
891      pipeline: database_pipeline
892
893transform:
894  - field: typename
895    type: string
896  - field: ts
897    type: timestamp, ns
898    index: time
899"#;
900
901        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
902        assert!(r.is_err());
903        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
904        assert!(r.is_err());
905        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
906        assert!(r.is_err());
907    }
908
909    /// Test one-to-many VRL pipeline expansion.
910    /// A VRL processor can return an array, which results in multiple output rows.
911    #[test]
912    fn test_one_to_many_vrl_expansion() {
913        let pipeline_yaml = r#"
914processors:
915  - epoch:
916      field: timestamp
917      resolution: ms
918  - vrl:
919      source: |
920        events = del(.events)
921        base_host = del(.host)
922        base_ts = del(.timestamp)
923        map_values(array!(events)) -> |event| {
924            {
925                "host": base_host,
926                "event_type": event.type,
927                "event_value": event.value,
928                "timestamp": base_ts
929            }
930        }
931
932transform:
933  - field: host
934    type: string
935  - field: event_type
936    type: string
937  - field: event_value
938    type: int32
939  - field: timestamp
940    type: timestamp, ms
941    index: time
942"#;
943
944        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
945        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
946        let pipeline_ctx = PipelineContext::new(
947            &pipeline_def,
948            &pipeline_param,
949            session::context::Channel::Unknown,
950        );
951
952        // Input with 3 events
953        let input_value: serde_json::Value = serde_json::from_str(
954            r#"{
955                "host": "server1",
956                "timestamp": 1716668197217,
957                "events": [
958                    {"type": "cpu", "value": 80},
959                    {"type": "memory", "value": 60},
960                    {"type": "disk", "value": 45}
961                ]
962            }"#,
963        )
964        .unwrap();
965
966        let payload = input_value.into();
967        let result = pipeline
968            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
969            .unwrap()
970            .into_transformed()
971            .unwrap();
972
973        // Should produce 3 rows from 1 input
974        assert_eq!(result.len(), 3);
975
976        // Verify each row has correct structure
977        for (row, _table_suffix) in &result {
978            assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
979            // First value should be "server1"
980            assert_eq!(
981                row.values[0].value_data,
982                Some(ValueData::StringValue("server1".to_string()))
983            );
984            // Last value should be the timestamp
985            assert_eq!(
986                row.values[3].value_data,
987                Some(ValueData::TimestampMillisecondValue(1716668197217))
988            );
989        }
990
991        // Verify event types
992        let event_types: Vec<_> = result
993            .iter()
994            .map(|(r, _)| match &r.values[1].value_data {
995                Some(ValueData::StringValue(s)) => s.clone(),
996                _ => panic!("expected string"),
997            })
998            .collect();
999        assert!(event_types.contains(&"cpu".to_string()));
1000        assert!(event_types.contains(&"memory".to_string()));
1001        assert!(event_types.contains(&"disk".to_string()));
1002    }
1003
1004    /// Test that single object output still works (backward compatibility)
1005    #[test]
1006    fn test_single_object_output_unchanged() {
1007        let pipeline_yaml = r#"
1008processors:
1009  - epoch:
1010      field: ts
1011      resolution: ms
1012  - vrl:
1013      source: |
1014        .processed = true
1015        .
1016
1017transform:
1018  - field: name
1019    type: string
1020  - field: processed
1021    type: boolean
1022  - field: ts
1023    type: timestamp, ms
1024    index: time
1025"#;
1026
1027        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1028        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1029        let pipeline_ctx = PipelineContext::new(
1030            &pipeline_def,
1031            &pipeline_param,
1032            session::context::Channel::Unknown,
1033        );
1034
1035        let input_value: serde_json::Value = serde_json::from_str(
1036            r#"{
1037                "name": "test",
1038                "ts": 1716668197217
1039            }"#,
1040        )
1041        .unwrap();
1042
1043        let payload = input_value.into();
1044        let result = pipeline
1045            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1046            .unwrap()
1047            .into_transformed()
1048            .unwrap();
1049
1050        // Should produce exactly 1 row
1051        assert_eq!(result.len(), 1);
1052        assert_eq!(
1053            result[0].0.values[0].value_data,
1054            Some(ValueData::StringValue("test".to_string()))
1055        );
1056        assert_eq!(
1057            result[0].0.values[1].value_data,
1058            Some(ValueData::BoolValue(true))
1059        );
1060    }
1061
1062    /// Test that empty array produces zero rows
1063    #[test]
1064    fn test_empty_array_produces_zero_rows() {
1065        let pipeline_yaml = r#"
1066processors:
1067  - vrl:
1068      source: |
1069        .events
1070
1071transform:
1072  - field: value
1073    type: int32
1074  - field: greptime_timestamp
1075    type: timestamp, ns
1076    index: time
1077"#;
1078
1079        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1080        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1081        let pipeline_ctx = PipelineContext::new(
1082            &pipeline_def,
1083            &pipeline_param,
1084            session::context::Channel::Unknown,
1085        );
1086
1087        let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1088
1089        let payload = input_value.into();
1090        let result = pipeline
1091            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1092            .unwrap()
1093            .into_transformed()
1094            .unwrap();
1095
1096        // Empty array should produce zero rows
1097        assert_eq!(result.len(), 0);
1098    }
1099
1100    /// Test that array elements must be objects
1101    #[test]
1102    fn test_array_element_must_be_object() {
1103        let pipeline_yaml = r#"
1104processors:
1105  - vrl:
1106      source: |
1107        .items
1108
1109transform:
1110  - field: value
1111    type: int32
1112  - field: greptime_timestamp
1113    type: timestamp, ns
1114    index: time
1115"#;
1116
1117        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1118        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1119        let pipeline_ctx = PipelineContext::new(
1120            &pipeline_def,
1121            &pipeline_param,
1122            session::context::Channel::Unknown,
1123        );
1124
1125        // Array with non-object elements should fail
1126        let input_value: serde_json::Value =
1127            serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap();
1128
1129        let payload = input_value.into();
1130        let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info);
1131
1132        assert!(result.is_err());
1133        let err_msg = result.unwrap_err().to_string();
1134        assert!(
1135            err_msg.contains("must be an object"),
1136            "Expected error about non-object element, got: {}",
1137            err_msg
1138        );
1139    }
1140
1141    /// Test one-to-many with table suffix from VRL hint
1142    #[test]
1143    fn test_one_to_many_with_table_suffix_hint() {
1144        let pipeline_yaml = r#"
1145processors:
1146  - epoch:
1147      field: ts
1148      resolution: ms
1149  - vrl:
1150      source: |
1151        .greptime_table_suffix = "_" + string!(.category)
1152        .
1153
1154transform:
1155  - field: name
1156    type: string
1157  - field: category
1158    type: string
1159  - field: ts
1160    type: timestamp, ms
1161    index: time
1162"#;
1163
1164        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1165        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1166        let pipeline_ctx = PipelineContext::new(
1167            &pipeline_def,
1168            &pipeline_param,
1169            session::context::Channel::Unknown,
1170        );
1171
1172        let input_value: serde_json::Value = serde_json::from_str(
1173            r#"{
1174                "name": "test",
1175                "category": "metrics",
1176                "ts": 1716668197217
1177            }"#,
1178        )
1179        .unwrap();
1180
1181        let payload = input_value.into();
1182        let result = pipeline
1183            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1184            .unwrap()
1185            .into_transformed()
1186            .unwrap();
1187
1188        // Should have table suffix extracted per row
1189        assert_eq!(result.len(), 1);
1190        assert_eq!(result[0].1, Some("_metrics".to_string()));
1191    }
1192
1193    /// Test one-to-many with per-row table suffix
1194    #[test]
1195    fn test_one_to_many_per_row_table_suffix() {
1196        let pipeline_yaml = r#"
1197processors:
1198  - epoch:
1199      field: timestamp
1200      resolution: ms
1201  - vrl:
1202      source: |
1203        events = del(.events)
1204        base_ts = del(.timestamp)
1205
1206        map_values(array!(events)) -> |event| {
1207            suffix = "_" + string!(event.category)
1208            {
1209                "name": event.name,
1210                "value": event.value,
1211                "timestamp": base_ts,
1212                "greptime_table_suffix": suffix
1213            }
1214        }
1215
1216transform:
1217  - field: name
1218    type: string
1219  - field: value
1220    type: int32
1221  - field: timestamp
1222    type: timestamp, ms
1223    index: time
1224"#;
1225
1226        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1227        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1228        let pipeline_ctx = PipelineContext::new(
1229            &pipeline_def,
1230            &pipeline_param,
1231            session::context::Channel::Unknown,
1232        );
1233
1234        // Input with events that should go to different tables
1235        let input_value: serde_json::Value = serde_json::from_str(
1236            r#"{
1237                "timestamp": 1716668197217,
1238                "events": [
1239                    {"name": "cpu_usage", "value": 80, "category": "cpu"},
1240                    {"name": "mem_usage", "value": 60, "category": "memory"},
1241                    {"name": "cpu_temp", "value": 45, "category": "cpu"}
1242                ]
1243            }"#,
1244        )
1245        .unwrap();
1246
1247        let payload = input_value.into();
1248        let result = pipeline
1249            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1250            .unwrap()
1251            .into_transformed()
1252            .unwrap();
1253
1254        // Should produce 3 rows
1255        assert_eq!(result.len(), 3);
1256
1257        // Collect table suffixes
1258        let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect();
1259
1260        // Should have different table suffixes per row
1261        assert!(table_suffixes.contains(&Some("_cpu".to_string())));
1262        assert!(table_suffixes.contains(&Some("_memory".to_string())));
1263
1264        // Count rows per table suffix
1265        let cpu_count = table_suffixes
1266            .iter()
1267            .filter(|s| *s == &Some("_cpu".to_string()))
1268            .count();
1269        let memory_count = table_suffixes
1270            .iter()
1271            .filter(|s| *s == &Some("_memory".to_string()))
1272            .count();
1273        assert_eq!(cpu_count, 2);
1274        assert_eq!(memory_count, 1);
1275    }
1276
1277    /// Test that one-to-many mapping preserves per-row ContextOpt in HashMap
1278    #[test]
1279    fn test_one_to_many_hashmap_contextopt_preservation() {
1280        let pipeline_yaml = r#"
1281processors:
1282  - epoch:
1283      field: timestamp
1284      resolution: ms
1285  - vrl:
1286      source: |
1287        events = del(.events)
1288        base_ts = del(.timestamp)
1289
1290        map_values(array!(events)) -> |event| {
1291            # Set different TTL values per event type
1292            ttl = if event.type == "critical" {
1293                "1h"
1294            } else if event.type == "warning" {
1295                "24h"
1296            } else {
1297                "7d"
1298            }
1299
1300            {
1301                "host": del(.host),
1302                "event_type": event.type,
1303                "event_value": event.value,
1304                "timestamp": base_ts,
1305                "greptime_ttl": ttl
1306            }
1307        }
1308
1309transform:
1310  - field: host
1311    type: string
1312  - field: event_type
1313    type: string
1314  - field: event_value
1315    type: int32
1316  - field: timestamp
1317    type: timestamp, ms
1318    index: time
1319"#;
1320
1321        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1322        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1323        let pipeline_ctx = PipelineContext::new(
1324            &pipeline_def,
1325            &pipeline_param,
1326            session::context::Channel::Unknown,
1327        );
1328
1329        // Input with events that should have different ContextOpt values
1330        let input_value: serde_json::Value = serde_json::from_str(
1331            r#"{
1332                "host": "server1",
1333                "timestamp": 1716668197217,
1334                "events": [
1335                    {"type": "critical", "value": 100},
1336                    {"type": "warning", "value": 50},
1337                    {"type": "info", "value": 25}
1338                ]
1339            }"#,
1340        )
1341        .unwrap();
1342
1343        let payload = input_value.into();
1344        let result = pipeline
1345            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1346            .unwrap();
1347
1348        // Extract the HashMap structure
1349        let rows_by_context = result.into_transformed_hashmap().unwrap();
1350
1351        // Should have 3 different ContextOpt groups due to different TTL values
1352        assert_eq!(rows_by_context.len(), 3);
1353
1354        // Verify each ContextOpt group has exactly 1 row and different configurations
1355        let mut context_opts = Vec::new();
1356        for (opt, rows) in &rows_by_context {
1357            assert_eq!(rows.len(), 1); // Each group should have exactly 1 row
1358            context_opts.push(opt.clone());
1359        }
1360
1361        // ContextOpts should be different due to different TTL values
1362        assert_ne!(context_opts[0], context_opts[1]);
1363        assert_ne!(context_opts[1], context_opts[2]);
1364        assert_ne!(context_opts[0], context_opts[2]);
1365
1366        // Verify the rows are correctly structured
1367        for rows in rows_by_context.values() {
1368            for (row, _table_suffix) in rows {
1369                assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
1370            }
1371        }
1372    }
1373
1374    /// Test that single object input still works with HashMap structure
1375    #[test]
1376    fn test_single_object_hashmap_compatibility() {
1377        let pipeline_yaml = r#"
1378processors:
1379  - epoch:
1380      field: ts
1381      resolution: ms
1382  - vrl:
1383      source: |
1384        .processed = true
1385        .
1386
1387transform:
1388  - field: name
1389    type: string
1390  - field: processed
1391    type: boolean
1392  - field: ts
1393    type: timestamp, ms
1394    index: time
1395"#;
1396
1397        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1398        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1399        let pipeline_ctx = PipelineContext::new(
1400            &pipeline_def,
1401            &pipeline_param,
1402            session::context::Channel::Unknown,
1403        );
1404
1405        let input_value: serde_json::Value = serde_json::from_str(
1406            r#"{
1407                "name": "test",
1408                "ts": 1716668197217
1409            }"#,
1410        )
1411        .unwrap();
1412
1413        let payload = input_value.into();
1414        let result = pipeline
1415            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1416            .unwrap();
1417
1418        // Extract the HashMap structure
1419        let rows_by_context = result.into_transformed_hashmap().unwrap();
1420
1421        // Single object should produce exactly 1 ContextOpt group
1422        assert_eq!(rows_by_context.len(), 1);
1423
1424        let (_opt, rows) = rows_by_context.into_iter().next().unwrap();
1425        assert_eq!(rows.len(), 1);
1426
1427        // Verify the row structure
1428        let (row, _table_suffix) = &rows[0];
1429        assert_eq!(row.values.len(), 3); // name, processed, timestamp
1430    }
1431
1432    /// Test that empty arrays work correctly with HashMap structure
1433    #[test]
1434    fn test_empty_array_hashmap() {
1435        let pipeline_yaml = r#"
1436processors:
1437  - vrl:
1438      source: |
1439        .events
1440
1441transform:
1442  - field: value
1443    type: int32
1444  - field: greptime_timestamp
1445    type: timestamp, ns
1446    index: time
1447"#;
1448
1449        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1450        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1451        let pipeline_ctx = PipelineContext::new(
1452            &pipeline_def,
1453            &pipeline_param,
1454            session::context::Channel::Unknown,
1455        );
1456
1457        let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1458
1459        let payload = input_value.into();
1460        let result = pipeline
1461            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1462            .unwrap();
1463
1464        // Extract the HashMap structure
1465        let rows_by_context = result.into_transformed_hashmap().unwrap();
1466
1467        // Empty array should produce empty HashMap
1468        assert_eq!(rows_by_context.len(), 0);
1469    }
1470}