Skip to main content

pipeline/
etl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use std::collections::HashMap;
23
24use api::v1::Row;
25use common_time::timestamp::TimeUnit;
26use itertools::Itertools;
27use processor::{Processor, Processors};
28use snafu::{OptionExt, ResultExt, ensure};
29use transform::Transforms;
30use vrl::core::Value as VrlValue;
31use yaml_rust::{Yaml, YamlLoader};
32
33use crate::dispatcher::{Dispatcher, Rule};
34use crate::error::{
35    ArrayElementMustBeObjectSnafu, AutoTransformOneTimestampSnafu, Error,
36    IntermediateKeyIndexSnafu, InvalidVersionNumberSnafu, Result, TransformArrayElementSnafu,
37    YamlLoadSnafu, YamlParseSnafu,
38};
39use crate::etl::processor::ProcessorKind;
40use crate::etl::transform::transformer::greptime::{RowWithTableSuffix, values_to_rows};
41use crate::tablesuffix::TableSuffixTemplate;
42use crate::{
43    ContextOpt, GreptimeTransformer, IdentityTimeIndex, PipelineContext, SchemaInfo,
44    unwrap_or_continue_if_err,
45};
46
47const DESCRIPTION: &str = "description";
48const DOC_VERSION: &str = "version";
49const PROCESSORS: &str = "processors";
50const TRANSFORM: &str = "transform";
51const TRANSFORMS: &str = "transforms";
52const DISPATCHER: &str = "dispatcher";
53const TABLESUFFIX: &str = "table_suffix";
54
55pub enum Content<'a> {
56    Json(&'a str),
57    Yaml(&'a str),
58}
59
60pub fn parse(input: &Content) -> Result<Pipeline> {
61    match input {
62        Content::Yaml(str) => {
63            let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
64
65            ensure!(docs.len() == 1, YamlParseSnafu);
66
67            let doc = &docs[0];
68
69            let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
70
71            let doc_version = (&doc[DOC_VERSION]).try_into()?;
72
73            let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
74                v.try_into()?
75            } else {
76                Processors::default()
77            };
78
79            let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
80            {
81                v.try_into()?
82            } else {
83                Transforms::default()
84            };
85
86            let transformer = if transformers.is_empty() {
87                // use auto transform
88                // check processors have at least one timestamp-related processor
89                let cnt = processors
90                    .iter()
91                    .filter_map(|p| match p {
92                        ProcessorKind::Date(d) if !d.ignore_missing() => Some(
93                            d.fields
94                                .iter()
95                                .map(|f| (f.target_or_input_field(), TimeUnit::Nanosecond))
96                                .collect_vec(),
97                        ),
98                        ProcessorKind::Epoch(e) if !e.ignore_missing() => Some(
99                            e.fields
100                                .iter()
101                                .map(|f| (f.target_or_input_field(), (&e.resolution).into()))
102                                .collect_vec(),
103                        ),
104                        _ => None,
105                    })
106                    .flatten()
107                    .collect_vec();
108                ensure!(cnt.len() == 1, AutoTransformOneTimestampSnafu);
109
110                let (ts_name, timeunit) = cnt.first().unwrap();
111                TransformerMode::AutoTransform(ts_name.to_string(), *timeunit)
112            } else {
113                TransformerMode::GreptimeTransformer(GreptimeTransformer::new(
114                    transformers,
115                    &doc_version,
116                )?)
117            };
118
119            let dispatcher = if !doc[DISPATCHER].is_badvalue() {
120                Some(Dispatcher::try_from(&doc[DISPATCHER])?)
121            } else {
122                None
123            };
124
125            let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
126                Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
127            } else {
128                None
129            };
130
131            Ok(Pipeline {
132                doc_version,
133                description,
134                processors,
135                transformer,
136                dispatcher,
137                tablesuffix,
138            })
139        }
140        Content::Json(_) => unimplemented!(),
141    }
142}
143
144#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
145pub enum PipelineDocVersion {
146    /// 1. All fields meant to be preserved have to explicitly set in the transform section.
147    /// 2. Or no transform is set, then the auto-transform will be used.
148    #[default]
149    V1,
150
151    /// A combination of transform and auto-transform.
152    /// First it goes through the transform section,
153    /// then use auto-transform to set the rest fields.
154    ///
155    /// This is useful if you only want to set the index field,
156    /// and let the normal fields be auto-inferred.
157    V2,
158}
159
160impl TryFrom<&Yaml> for PipelineDocVersion {
161    type Error = Error;
162
163    fn try_from(value: &Yaml) -> Result<Self> {
164        if value.is_badvalue() || value.is_null() {
165            return Ok(PipelineDocVersion::V1);
166        }
167
168        let version = match value {
169            Yaml::String(s) => s
170                .parse::<i64>()
171                .map_err(|_| InvalidVersionNumberSnafu { version: s.clone() }.build())?,
172            Yaml::Integer(i) => *i,
173            _ => {
174                return InvalidVersionNumberSnafu {
175                    version: value.as_str().unwrap_or_default().to_string(),
176                }
177                .fail();
178            }
179        };
180
181        match version {
182            1 => Ok(PipelineDocVersion::V1),
183            2 => Ok(PipelineDocVersion::V2),
184            _ => InvalidVersionNumberSnafu {
185                version: version.to_string(),
186            }
187            .fail(),
188        }
189    }
190}
191
192#[derive(Debug)]
193pub struct Pipeline {
194    doc_version: PipelineDocVersion,
195    description: Option<String>,
196    processors: processor::Processors,
197    dispatcher: Option<Dispatcher>,
198    transformer: TransformerMode,
199    tablesuffix: Option<TableSuffixTemplate>,
200}
201
202#[derive(Debug, Clone)]
203pub enum TransformerMode {
204    GreptimeTransformer(GreptimeTransformer),
205    AutoTransform(String, TimeUnit),
206}
207
208/// Where the pipeline executed is dispatched to, with context information
209#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
210pub struct DispatchedTo {
211    pub table_suffix: String,
212    pub pipeline: Option<String>,
213}
214
215impl From<&Rule> for DispatchedTo {
216    fn from(value: &Rule) -> Self {
217        DispatchedTo {
218            table_suffix: value.table_suffix.clone(),
219            pipeline: value.pipeline.clone(),
220        }
221    }
222}
223
224impl DispatchedTo {
225    /// Generate destination table name from input
226    pub fn dispatched_to_table_name(&self, original: &str) -> String {
227        [original, &self.table_suffix].concat()
228    }
229}
230
231/// The result of pipeline execution
232#[derive(Debug)]
233pub enum PipelineExecOutput {
234    Transformed(TransformedOutput),
235    DispatchedTo(DispatchedTo, VrlValue),
236    Filtered,
237}
238
239/// Output from a successful pipeline transformation.
240///
241/// Rows are grouped by their ContextOpt, with each row having its own optional
242/// table_suffix for routing to different tables when using one-to-many expansion.
243/// This enables true per-row configuration options where different rows can have
244/// different database settings (TTL, merge mode, etc.).
245#[derive(Debug)]
246pub struct TransformedOutput {
247    /// Rows grouped by their ContextOpt, each with optional table suffix
248    pub rows_by_context: HashMap<ContextOpt, Vec<RowWithTableSuffix>>,
249}
250
251impl PipelineExecOutput {
252    // Note: This is a test only function, do not use it in production.
253    pub fn into_transformed(self) -> Option<Vec<RowWithTableSuffix>> {
254        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
255            // For backward compatibility, merge all rows with a default ContextOpt
256            Some(rows_by_context.into_values().flatten().collect())
257        } else {
258            None
259        }
260    }
261
262    // New method for accessing the HashMap structure directly
263    pub fn into_transformed_hashmap(self) -> Option<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
264        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
265            Some(rows_by_context)
266        } else {
267            None
268        }
269    }
270
271    // Backward compatibility helper that returns first ContextOpt with all its rows
272    // or merges all rows with default ContextOpt for multi-context scenarios
273    pub fn into_legacy_format(self) -> Option<(ContextOpt, Vec<RowWithTableSuffix>)> {
274        if let Self::Transformed(TransformedOutput { rows_by_context }) = self {
275            if rows_by_context.len() == 1 {
276                let (opt, rows) = rows_by_context.into_iter().next().unwrap();
277                Some((opt, rows))
278            } else {
279                // Multiple contexts: merge all rows with default ContextOpt for test compatibility
280                let all_rows: Vec<RowWithTableSuffix> =
281                    rows_by_context.into_values().flatten().collect();
282                Some((ContextOpt::default(), all_rows))
283            }
284        } else {
285            None
286        }
287    }
288
289    // Note: This is a test only function, do not use it in production.
290    pub fn into_dispatched(self) -> Option<DispatchedTo> {
291        if let Self::DispatchedTo(d, _) = self {
292            Some(d)
293        } else {
294            None
295        }
296    }
297}
298
299impl Pipeline {
300    fn is_v1(&self) -> bool {
301        self.doc_version == PipelineDocVersion::V1
302    }
303
304    pub fn exec_mut(
305        &self,
306        mut val: VrlValue,
307        pipeline_ctx: &PipelineContext<'_>,
308        schema_info: &mut SchemaInfo,
309    ) -> Result<PipelineExecOutput> {
310        // process
311        for processor in self.processors.iter() {
312            val = processor.exec_mut(val)?;
313            if val.is_null() {
314                // line is filtered
315                return Ok(PipelineExecOutput::Filtered);
316            }
317        }
318
319        // dispatch, fast return if matched
320        if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
321            return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
322        }
323
324        let mut val = if val.is_array() {
325            val
326        } else {
327            VrlValue::Array(vec![val])
328        };
329
330        let rows_by_context = match self.transformer() {
331            TransformerMode::GreptimeTransformer(greptime_transformer) => {
332                transform_array_elements_by_ctx(
333                    // SAFETY: by line 326, val must be an array
334                    val.as_array_mut().unwrap(),
335                    greptime_transformer,
336                    self.is_v1(),
337                    schema_info,
338                    pipeline_ctx,
339                    self.tablesuffix.as_ref(),
340                )?
341            }
342            TransformerMode::AutoTransform(ts_name, time_unit) => {
343                let def = crate::PipelineDefinition::GreptimeIdentityPipeline(Some(
344                    IdentityTimeIndex::Epoch(ts_name.clone(), *time_unit, false),
345                ));
346                let n_ctx =
347                    PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
348                values_to_rows(
349                    schema_info,
350                    val,
351                    &n_ctx,
352                    None,
353                    true,
354                    self.tablesuffix.as_ref(),
355                )?
356            }
357        };
358
359        Ok(PipelineExecOutput::Transformed(TransformedOutput {
360            rows_by_context,
361        }))
362    }
363
364    pub fn processors(&self) -> &processor::Processors {
365        &self.processors
366    }
367
368    pub fn transformer(&self) -> &TransformerMode {
369        &self.transformer
370    }
371
372    // the method is for test purpose
373    pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
374        match &self.transformer {
375            TransformerMode::GreptimeTransformer(t) => Some(t.schemas()),
376            TransformerMode::AutoTransform(_, _) => None,
377        }
378    }
379
380    pub fn is_variant_table_name(&self) -> bool {
381        // even if the pipeline doesn't have dispatcher or table_suffix,
382        // it can still be a variant because of VRL processor and hint
383        self.dispatcher.is_some() || self.tablesuffix.is_some()
384    }
385}
386
387/// Transforms an array of VRL values into rows grouped by their ContextOpt.
388/// Each element can have its own ContextOpt for per-row configuration.
389fn transform_array_elements_by_ctx(
390    arr: &mut [VrlValue],
391    transformer: &GreptimeTransformer,
392    is_v1: bool,
393    schema_info: &mut SchemaInfo,
394    pipeline_ctx: &PipelineContext<'_>,
395    tablesuffix_template: Option<&TableSuffixTemplate>,
396) -> Result<HashMap<ContextOpt, Vec<RowWithTableSuffix>>> {
397    let skip_error = pipeline_ctx.pipeline_param.skip_error();
398    let mut rows_by_context = HashMap::new();
399
400    for (index, element) in arr.iter_mut().enumerate() {
401        if !element.is_object() {
402            unwrap_or_continue_if_err!(
403                ArrayElementMustBeObjectSnafu {
404                    index,
405                    actual_type: element.kind_str().to_string(),
406                }
407                .fail(),
408                skip_error
409            );
410        }
411
412        let values =
413            unwrap_or_continue_if_err!(transformer.transform_mut(element, is_v1), skip_error);
414        if is_v1 {
415            // v1 mode: just use transformer output directly
416            let mut opt = unwrap_or_continue_if_err!(
417                ContextOpt::from_pipeline_map_to_opt(element),
418                skip_error
419            );
420            let table_suffix = opt.resolve_table_suffix(tablesuffix_template, element);
421            rows_by_context
422                .entry(opt)
423                .or_insert_with(Vec::new)
424                .push((Row { values }, table_suffix));
425        } else {
426            // v2 mode: combine with auto-transform for remaining fields
427            let element_rows_map = values_to_rows(
428                schema_info,
429                element.clone(),
430                pipeline_ctx,
431                Some(values),
432                false,
433                tablesuffix_template,
434            )
435            .map_err(Box::new)
436            .context(TransformArrayElementSnafu { index })?;
437            for (k, v) in element_rows_map {
438                rows_by_context.entry(k).or_default().extend(v);
439            }
440        }
441    }
442
443    Ok(rows_by_context)
444}
445
446pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
447    intermediate_keys
448        .iter()
449        .position(|k| k == key)
450        .context(IntermediateKeyIndexSnafu { kind, key })
451}
452
453/// This macro is test only, do not use it in production.
454/// The schema_info cannot be used in auto-transform ts-infer mode for lacking the ts schema.
455///
456/// Usage:
457/// ```ignore
458/// let (pipeline, schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
459/// let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_param, Channel::Unknown);
460/// ```
461#[macro_export]
462macro_rules! setup_pipeline {
463    ($pipeline:expr) => {{
464        use std::sync::Arc;
465
466        use $crate::{GreptimePipelineParams, Pipeline, PipelineDefinition, SchemaInfo};
467
468        let pipeline: Arc<Pipeline> = Arc::new($pipeline);
469        let schema = pipeline.schemas().unwrap();
470        let schema_info = SchemaInfo::from_schema_list(schema.clone());
471
472        let pipeline_def = PipelineDefinition::Resolved(pipeline.clone());
473        let pipeline_param = GreptimePipelineParams::default();
474
475        (pipeline, schema_info, pipeline_def, pipeline_param)
476    }};
477}
478
479#[cfg(test)]
480mod tests {
481    use std::collections::BTreeMap;
482    use std::sync::Arc;
483
484    use api::v1::Rows;
485    use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
486    use greptime_proto::v1::value::ValueData;
487    use greptime_proto::v1::{self, ColumnDataType, SemanticType};
488    use vrl::prelude::Bytes;
489    use vrl::value::KeyString;
490
491    use super::*;
492
493    #[test]
494    fn test_pipeline_prepare() {
495        let input_value_str = r#"
496                    {
497                        "my_field": "1,2",
498                        "foo": "bar",
499                        "ts": "1"
500                    }
501                "#;
502        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
503
504        let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
505processors:
506    - csv:
507        field: my_field
508        target_fields: field1, field2
509    - epoch:
510        field: ts
511        resolution: ns
512transform:
513    - field: field1
514      type: uint32
515    - field: field2
516      type: uint32
517    - field: ts
518      type: timestamp, ns
519      index: time
520    "#;
521
522        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
523        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
524        let pipeline_ctx = PipelineContext::new(
525            &pipeline_def,
526            &pipeline_param,
527            session::context::Channel::Unknown,
528        );
529
530        let payload = input_value.into();
531        let mut result = pipeline
532            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
533            .unwrap()
534            .into_transformed()
535            .unwrap();
536
537        let (row, _table_suffix) = result.swap_remove(0);
538        assert_eq!(row.values[0].value_data, Some(ValueData::U32Value(1)));
539        assert_eq!(row.values[1].value_data, Some(ValueData::U32Value(2)));
540        match &row.values[2].value_data {
541            Some(ValueData::TimestampNanosecondValue(v)) => {
542                assert_ne!(v, &0);
543            }
544            _ => panic!("expect null value"),
545        }
546    }
547
548    #[test]
549    fn test_dissect_pipeline() {
550        let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
551        let pipeline_str = r#"processors:
552    - dissect:
553        fields:
554          - message
555        patterns:
556          - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
557    - date:
558        fields:
559          - ts
560        formats:
561          - "%d/%b/%Y:%H:%M:%S %z"
562
563transform:
564    - fields:
565        - ip
566        - username
567        - method
568        - path
569        - proto
570      type: string
571    - fields:
572        - status
573      type: uint16
574    - fields:
575        - bytes
576      type: uint32
577    - field: ts
578      type: timestamp, ns
579      index: time"#;
580        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
581        let pipeline = Arc::new(pipeline);
582        let schema = pipeline.schemas().unwrap();
583        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
584
585        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
586        let pipeline_param = crate::GreptimePipelineParams::default();
587        let pipeline_ctx = PipelineContext::new(
588            &pipeline_def,
589            &pipeline_param,
590            session::context::Channel::Unknown,
591        );
592        let payload = VrlValue::Object(BTreeMap::from([(
593            KeyString::from("message"),
594            VrlValue::Bytes(Bytes::from(message)),
595        )]));
596
597        let result = pipeline
598            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
599            .unwrap()
600            .into_transformed()
601            .unwrap();
602
603        assert_eq!(schema_info.schema.len(), result[0].0.values.len());
604        let test = [
605            (
606                ColumnDataType::String as i32,
607                Some(ValueData::StringValue("129.37.245.88".into())),
608            ),
609            (
610                ColumnDataType::String as i32,
611                Some(ValueData::StringValue("meln1ks".into())),
612            ),
613            (
614                ColumnDataType::String as i32,
615                Some(ValueData::StringValue("PATCH".into())),
616            ),
617            (
618                ColumnDataType::String as i32,
619                Some(ValueData::StringValue(
620                    "/observability/metrics/production".into(),
621                )),
622            ),
623            (
624                ColumnDataType::String as i32,
625                Some(ValueData::StringValue("HTTP/1.0".into())),
626            ),
627            (
628                ColumnDataType::Uint16 as i32,
629                Some(ValueData::U16Value(501)),
630            ),
631            (
632                ColumnDataType::Uint32 as i32,
633                Some(ValueData::U32Value(33085)),
634            ),
635            (
636                ColumnDataType::TimestampNanosecond as i32,
637                Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
638            ),
639        ];
640        // manually set schema
641        let schema = pipeline.schemas().unwrap();
642        for i in 0..schema.len() {
643            let schema = &schema[i];
644            let value = &result[0].0.values[i];
645            assert_eq!(schema.datatype, test[i].0);
646            assert_eq!(value.value_data, test[i].1);
647        }
648    }
649
650    #[test]
651    fn test_csv_pipeline() {
652        let input_value_str = r#"
653                    {
654                        "my_field": "1,2",
655                        "foo": "bar",
656                        "ts": "1"
657                    }
658                "#;
659        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
660
661        let pipeline_yaml = r#"
662    description: Pipeline for Apache Tomcat
663    processors:
664      - csv:
665          field: my_field
666          target_fields: field1, field2
667      - epoch:
668          field: ts
669          resolution: ns
670    transform:
671      - field: field1
672        type: uint32
673      - field: field2
674        type: uint32
675      - field: ts
676        type: timestamp, ns
677        index: time
678    "#;
679
680        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
681        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
682        let pipeline_ctx = PipelineContext::new(
683            &pipeline_def,
684            &pipeline_param,
685            session::context::Channel::Unknown,
686        );
687
688        let payload = input_value.into();
689        let result = pipeline
690            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
691            .unwrap()
692            .into_transformed()
693            .unwrap();
694        assert_eq!(
695            result[0].0.values[0].value_data,
696            Some(ValueData::U32Value(1))
697        );
698        assert_eq!(
699            result[0].0.values[1].value_data,
700            Some(ValueData::U32Value(2))
701        );
702        match &result[0].0.values[2].value_data {
703            Some(ValueData::TimestampNanosecondValue(v)) => {
704                assert_ne!(v, &0);
705            }
706            _ => panic!("expect null value"),
707        }
708    }
709
710    #[test]
711    fn test_date_pipeline() {
712        let input_value_str = r#"
713                {
714                    "my_field": "1,2",
715                    "foo": "bar",
716                    "test_time": "2014-5-17T04:34:56+00:00"
717                }
718            "#;
719        let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
720
721        let pipeline_yaml = r#"---
722description: Pipeline for Apache Tomcat
723
724processors:
725    - date:
726        field: test_time
727
728transform:
729    - field: test_time
730      type: timestamp, ns
731      index: time
732    "#;
733
734        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
735        let pipeline = Arc::new(pipeline);
736        let schema = pipeline.schemas().unwrap();
737        let mut schema_info = SchemaInfo::from_schema_list(schema.clone());
738
739        let pipeline_def = crate::PipelineDefinition::Resolved(pipeline.clone());
740        let pipeline_param = crate::GreptimePipelineParams::default();
741        let pipeline_ctx = PipelineContext::new(
742            &pipeline_def,
743            &pipeline_param,
744            session::context::Channel::Unknown,
745        );
746        let schema = pipeline.schemas().unwrap().clone();
747        let result = input_value.into();
748
749        let rows_with_suffix = pipeline
750            .exec_mut(result, &pipeline_ctx, &mut schema_info)
751            .unwrap()
752            .into_transformed()
753            .unwrap();
754        let output = Rows {
755            schema,
756            rows: rows_with_suffix.into_iter().map(|(r, _)| r).collect(),
757        };
758        let schemas = output.schema;
759
760        assert_eq!(schemas.len(), 1);
761        let schema = schemas[0].clone();
762        assert_eq!("test_time", schema.column_name);
763        assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
764        assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
765
766        let row = output.rows[0].clone();
767        assert_eq!(1, row.values.len());
768        let value_data = row.values[0].clone().value_data;
769        assert_eq!(
770            Some(v1::value::ValueData::TimestampNanosecondValue(
771                1400301296000000000
772            )),
773            value_data
774        );
775    }
776
777    #[test]
778    fn test_dispatcher() {
779        let pipeline_yaml = r#"
780---
781description: Pipeline for Apache Tomcat
782
783processors:
784  - epoch:
785      field: ts
786      resolution: ns
787
788dispatcher:
789  field: typename
790  rules:
791    - value: http
792      table_suffix: http_events
793    - value: database
794      table_suffix: db_events
795      pipeline: database_pipeline
796
797transform:
798  - field: typename
799    type: string
800  - field: ts
801    type: timestamp, ns
802    index: time
803"#;
804        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
805        let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
806        assert_eq!(dispatcher.field, "typename");
807
808        assert_eq!(dispatcher.rules.len(), 2);
809
810        assert_eq!(
811            dispatcher.rules[0],
812            crate::dispatcher::Rule {
813                value: VrlValue::Bytes(Bytes::from("http")),
814                table_suffix: "http_events".to_string(),
815                pipeline: None
816            }
817        );
818
819        assert_eq!(
820            dispatcher.rules[1],
821            crate::dispatcher::Rule {
822                value: VrlValue::Bytes(Bytes::from("database")),
823                table_suffix: "db_events".to_string(),
824                pipeline: Some("database_pipeline".to_string()),
825            }
826        );
827
828        let bad_yaml1 = r#"
829---
830description: Pipeline for Apache Tomcat
831
832processors:
833  - epoch:
834      field: ts
835      resolution: ns
836
837dispatcher:
838  _field: typename
839  rules:
840    - value: http
841      table_suffix: http_events
842    - value: database
843      table_suffix: db_events
844      pipeline: database_pipeline
845
846transform:
847  - field: typename
848    type: string
849  - field: ts
850    type: timestamp, ns
851    index: time
852"#;
853        let bad_yaml2 = r#"
854---
855description: Pipeline for Apache Tomcat
856
857processors:
858  - epoch:
859      field: ts
860      resolution: ns
861dispatcher:
862  field: typename
863  rules:
864    - value: http
865      _table_suffix: http_events
866    - value: database
867      _table_suffix: db_events
868      pipeline: database_pipeline
869
870transform:
871  - field: typename
872    type: string
873  - field: ts
874    type: timestamp, ns
875    index: time
876"#;
877        let bad_yaml3 = r#"
878---
879description: Pipeline for Apache Tomcat
880
881processors:
882  - epoch:
883      field: ts
884      resolution: ns
885dispatcher:
886  field: typename
887  rules:
888    - _value: http
889      table_suffix: http_events
890    - _value: database
891      table_suffix: db_events
892      pipeline: database_pipeline
893
894transform:
895  - field: typename
896    type: string
897  - field: ts
898    type: timestamp, ns
899    index: time
900"#;
901
902        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
903        assert!(r.is_err());
904        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
905        assert!(r.is_err());
906        let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
907        assert!(r.is_err());
908    }
909
910    /// Test one-to-many VRL pipeline expansion.
911    /// A VRL processor can return an array, which results in multiple output rows.
912    #[test]
913    fn test_one_to_many_vrl_expansion() {
914        let pipeline_yaml = r#"
915processors:
916  - epoch:
917      field: timestamp
918      resolution: ms
919  - vrl:
920      source: |
921        events = del(.events)
922        base_host = del(.host)
923        base_ts = del(.timestamp)
924        map_values(array!(events)) -> |event| {
925            {
926                "host": base_host,
927                "event_type": event.type,
928                "event_value": event.value,
929                "timestamp": base_ts
930            }
931        }
932
933transform:
934  - field: host
935    type: string
936  - field: event_type
937    type: string
938  - field: event_value
939    type: int32
940  - field: timestamp
941    type: timestamp, ms
942    index: time
943"#;
944
945        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
946        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
947        let pipeline_ctx = PipelineContext::new(
948            &pipeline_def,
949            &pipeline_param,
950            session::context::Channel::Unknown,
951        );
952
953        // Input with 3 events
954        let input_value: serde_json::Value = serde_json::from_str(
955            r#"{
956                "host": "server1",
957                "timestamp": 1716668197217,
958                "events": [
959                    {"type": "cpu", "value": 80},
960                    {"type": "memory", "value": 60},
961                    {"type": "disk", "value": 45}
962                ]
963            }"#,
964        )
965        .unwrap();
966
967        let payload = input_value.into();
968        let result = pipeline
969            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
970            .unwrap()
971            .into_transformed()
972            .unwrap();
973
974        // Should produce 3 rows from 1 input
975        assert_eq!(result.len(), 3);
976
977        // Verify each row has correct structure
978        for (row, _table_suffix) in &result {
979            assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
980            // First value should be "server1"
981            assert_eq!(
982                row.values[0].value_data,
983                Some(ValueData::StringValue("server1".to_string()))
984            );
985            // Last value should be the timestamp
986            assert_eq!(
987                row.values[3].value_data,
988                Some(ValueData::TimestampMillisecondValue(1716668197217))
989            );
990        }
991
992        // Verify event types
993        let event_types: Vec<_> = result
994            .iter()
995            .map(|(r, _)| match &r.values[1].value_data {
996                Some(ValueData::StringValue(s)) => s.clone(),
997                _ => panic!("expected string"),
998            })
999            .collect();
1000        assert!(event_types.contains(&"cpu".to_string()));
1001        assert!(event_types.contains(&"memory".to_string()));
1002        assert!(event_types.contains(&"disk".to_string()));
1003    }
1004
1005    /// Test that single object output still works (backward compatibility)
1006    #[test]
1007    fn test_single_object_output_unchanged() {
1008        let pipeline_yaml = r#"
1009processors:
1010  - epoch:
1011      field: ts
1012      resolution: ms
1013  - vrl:
1014      source: |
1015        .processed = true
1016        .
1017
1018transform:
1019  - field: name
1020    type: string
1021  - field: processed
1022    type: boolean
1023  - field: ts
1024    type: timestamp, ms
1025    index: time
1026"#;
1027
1028        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1029        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1030        let pipeline_ctx = PipelineContext::new(
1031            &pipeline_def,
1032            &pipeline_param,
1033            session::context::Channel::Unknown,
1034        );
1035
1036        let input_value: serde_json::Value = serde_json::from_str(
1037            r#"{
1038                "name": "test",
1039                "ts": 1716668197217
1040            }"#,
1041        )
1042        .unwrap();
1043
1044        let payload = input_value.into();
1045        let result = pipeline
1046            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1047            .unwrap()
1048            .into_transformed()
1049            .unwrap();
1050
1051        // Should produce exactly 1 row
1052        assert_eq!(result.len(), 1);
1053        assert_eq!(
1054            result[0].0.values[0].value_data,
1055            Some(ValueData::StringValue("test".to_string()))
1056        );
1057        assert_eq!(
1058            result[0].0.values[1].value_data,
1059            Some(ValueData::BoolValue(true))
1060        );
1061    }
1062
1063    /// Test that empty array produces zero rows
1064    #[test]
1065    fn test_empty_array_produces_zero_rows() {
1066        let pipeline_yaml = r#"
1067processors:
1068  - vrl:
1069      source: |
1070        .events
1071
1072transform:
1073  - field: value
1074    type: int32
1075  - field: greptime_timestamp
1076    type: timestamp, ns
1077    index: time
1078"#;
1079
1080        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1081        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1082        let pipeline_ctx = PipelineContext::new(
1083            &pipeline_def,
1084            &pipeline_param,
1085            session::context::Channel::Unknown,
1086        );
1087
1088        let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1089
1090        let payload = input_value.into();
1091        let result = pipeline
1092            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1093            .unwrap()
1094            .into_transformed()
1095            .unwrap();
1096
1097        // Empty array should produce zero rows
1098        assert_eq!(result.len(), 0);
1099    }
1100
1101    /// Test that array elements must be objects
1102    #[test]
1103    fn test_array_element_must_be_object() {
1104        let pipeline_yaml = r#"
1105processors:
1106  - vrl:
1107      source: |
1108        .items
1109
1110transform:
1111  - field: value
1112    type: int32
1113  - field: greptime_timestamp
1114    type: timestamp, ns
1115    index: time
1116"#;
1117
1118        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1119        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1120        let pipeline_ctx = PipelineContext::new(
1121            &pipeline_def,
1122            &pipeline_param,
1123            session::context::Channel::Unknown,
1124        );
1125
1126        // Array with non-object elements should fail
1127        let input_value: serde_json::Value =
1128            serde_json::from_str(r#"{"items": [1, 2, 3]}"#).unwrap();
1129
1130        let payload = input_value.into();
1131        let result = pipeline.exec_mut(payload, &pipeline_ctx, &mut schema_info);
1132
1133        assert!(result.is_err());
1134        let err_msg = result.unwrap_err().to_string();
1135        assert!(
1136            err_msg.contains("must be an object"),
1137            "Expected error about non-object element, got: {}",
1138            err_msg
1139        );
1140    }
1141
1142    /// Test one-to-many with table suffix from VRL hint
1143    #[test]
1144    fn test_one_to_many_with_table_suffix_hint() {
1145        let pipeline_yaml = r#"
1146processors:
1147  - epoch:
1148      field: ts
1149      resolution: ms
1150  - vrl:
1151      source: |
1152        .greptime_table_suffix = "_" + string!(.category)
1153        .
1154
1155transform:
1156  - field: name
1157    type: string
1158  - field: category
1159    type: string
1160  - field: ts
1161    type: timestamp, ms
1162    index: time
1163"#;
1164
1165        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1166        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1167        let pipeline_ctx = PipelineContext::new(
1168            &pipeline_def,
1169            &pipeline_param,
1170            session::context::Channel::Unknown,
1171        );
1172
1173        let input_value: serde_json::Value = serde_json::from_str(
1174            r#"{
1175                "name": "test",
1176                "category": "metrics",
1177                "ts": 1716668197217
1178            }"#,
1179        )
1180        .unwrap();
1181
1182        let payload = input_value.into();
1183        let result = pipeline
1184            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1185            .unwrap()
1186            .into_transformed()
1187            .unwrap();
1188
1189        // Should have table suffix extracted per row
1190        assert_eq!(result.len(), 1);
1191        assert_eq!(result[0].1, Some("_metrics".to_string()));
1192    }
1193
1194    /// Test one-to-many with per-row table suffix
1195    #[test]
1196    fn test_one_to_many_per_row_table_suffix() {
1197        let pipeline_yaml = r#"
1198processors:
1199  - epoch:
1200      field: timestamp
1201      resolution: ms
1202  - vrl:
1203      source: |
1204        events = del(.events)
1205        base_ts = del(.timestamp)
1206
1207        map_values(array!(events)) -> |event| {
1208            suffix = "_" + string!(event.category)
1209            {
1210                "name": event.name,
1211                "value": event.value,
1212                "timestamp": base_ts,
1213                "greptime_table_suffix": suffix
1214            }
1215        }
1216
1217transform:
1218  - field: name
1219    type: string
1220  - field: value
1221    type: int32
1222  - field: timestamp
1223    type: timestamp, ms
1224    index: time
1225"#;
1226
1227        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1228        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1229        let pipeline_ctx = PipelineContext::new(
1230            &pipeline_def,
1231            &pipeline_param,
1232            session::context::Channel::Unknown,
1233        );
1234
1235        // Input with events that should go to different tables
1236        let input_value: serde_json::Value = serde_json::from_str(
1237            r#"{
1238                "timestamp": 1716668197217,
1239                "events": [
1240                    {"name": "cpu_usage", "value": 80, "category": "cpu"},
1241                    {"name": "mem_usage", "value": 60, "category": "memory"},
1242                    {"name": "cpu_temp", "value": 45, "category": "cpu"}
1243                ]
1244            }"#,
1245        )
1246        .unwrap();
1247
1248        let payload = input_value.into();
1249        let result = pipeline
1250            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1251            .unwrap()
1252            .into_transformed()
1253            .unwrap();
1254
1255        // Should produce 3 rows
1256        assert_eq!(result.len(), 3);
1257
1258        // Collect table suffixes
1259        let table_suffixes: Vec<_> = result.iter().map(|(_, suffix)| suffix.clone()).collect();
1260
1261        // Should have different table suffixes per row
1262        assert!(table_suffixes.contains(&Some("_cpu".to_string())));
1263        assert!(table_suffixes.contains(&Some("_memory".to_string())));
1264
1265        // Count rows per table suffix
1266        let cpu_count = table_suffixes
1267            .iter()
1268            .filter(|s| *s == &Some("_cpu".to_string()))
1269            .count();
1270        let memory_count = table_suffixes
1271            .iter()
1272            .filter(|s| *s == &Some("_memory".to_string()))
1273            .count();
1274        assert_eq!(cpu_count, 2);
1275        assert_eq!(memory_count, 1);
1276    }
1277
1278    /// Test that one-to-many mapping preserves per-row ContextOpt in HashMap
1279    #[test]
1280    fn test_one_to_many_hashmap_contextopt_preservation() {
1281        let pipeline_yaml = r#"
1282processors:
1283  - epoch:
1284      field: timestamp
1285      resolution: ms
1286  - vrl:
1287      source: |
1288        events = del(.events)
1289        base_ts = del(.timestamp)
1290
1291        map_values(array!(events)) -> |event| {
1292            # Set different TTL values per event type
1293            ttl = if event.type == "critical" {
1294                "1h"
1295            } else if event.type == "warning" {
1296                "24h"
1297            } else {
1298                "7d"
1299            }
1300
1301            {
1302                "host": del(.host),
1303                "event_type": event.type,
1304                "event_value": event.value,
1305                "timestamp": base_ts,
1306                "greptime_ttl": ttl
1307            }
1308        }
1309
1310transform:
1311  - field: host
1312    type: string
1313  - field: event_type
1314    type: string
1315  - field: event_value
1316    type: int32
1317  - field: timestamp
1318    type: timestamp, ms
1319    index: time
1320"#;
1321
1322        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1323        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1324        let pipeline_ctx = PipelineContext::new(
1325            &pipeline_def,
1326            &pipeline_param,
1327            session::context::Channel::Unknown,
1328        );
1329
1330        // Input with events that should have different ContextOpt values
1331        let input_value: serde_json::Value = serde_json::from_str(
1332            r#"{
1333                "host": "server1",
1334                "timestamp": 1716668197217,
1335                "events": [
1336                    {"type": "critical", "value": 100},
1337                    {"type": "warning", "value": 50},
1338                    {"type": "info", "value": 25}
1339                ]
1340            }"#,
1341        )
1342        .unwrap();
1343
1344        let payload = input_value.into();
1345        let result = pipeline
1346            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1347            .unwrap();
1348
1349        // Extract the HashMap structure
1350        let rows_by_context = result.into_transformed_hashmap().unwrap();
1351
1352        // Should have 3 different ContextOpt groups due to different TTL values
1353        assert_eq!(rows_by_context.len(), 3);
1354
1355        // Verify each ContextOpt group has exactly 1 row and different configurations
1356        let mut context_opts = Vec::new();
1357        for (opt, rows) in &rows_by_context {
1358            assert_eq!(rows.len(), 1); // Each group should have exactly 1 row
1359            context_opts.push(opt.clone());
1360        }
1361
1362        // ContextOpts should be different due to different TTL values
1363        assert_ne!(context_opts[0], context_opts[1]);
1364        assert_ne!(context_opts[1], context_opts[2]);
1365        assert_ne!(context_opts[0], context_opts[2]);
1366
1367        // Verify the rows are correctly structured
1368        for rows in rows_by_context.values() {
1369            for (row, _table_suffix) in rows {
1370                assert_eq!(row.values.len(), 4); // host, event_type, event_value, timestamp
1371            }
1372        }
1373    }
1374
1375    /// Test that single object input still works with HashMap structure
1376    #[test]
1377    fn test_single_object_hashmap_compatibility() {
1378        let pipeline_yaml = r#"
1379processors:
1380  - epoch:
1381      field: ts
1382      resolution: ms
1383  - vrl:
1384      source: |
1385        .processed = true
1386        .
1387
1388transform:
1389  - field: name
1390    type: string
1391  - field: processed
1392    type: boolean
1393  - field: ts
1394    type: timestamp, ms
1395    index: time
1396"#;
1397
1398        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1399        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1400        let pipeline_ctx = PipelineContext::new(
1401            &pipeline_def,
1402            &pipeline_param,
1403            session::context::Channel::Unknown,
1404        );
1405
1406        let input_value: serde_json::Value = serde_json::from_str(
1407            r#"{
1408                "name": "test",
1409                "ts": 1716668197217
1410            }"#,
1411        )
1412        .unwrap();
1413
1414        let payload = input_value.into();
1415        let result = pipeline
1416            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1417            .unwrap();
1418
1419        // Extract the HashMap structure
1420        let rows_by_context = result.into_transformed_hashmap().unwrap();
1421
1422        // Single object should produce exactly 1 ContextOpt group
1423        assert_eq!(rows_by_context.len(), 1);
1424
1425        let (_opt, rows) = rows_by_context.into_iter().next().unwrap();
1426        assert_eq!(rows.len(), 1);
1427
1428        // Verify the row structure
1429        let (row, _table_suffix) = &rows[0];
1430        assert_eq!(row.values.len(), 3); // name, processed, timestamp
1431    }
1432
1433    /// Test that empty arrays work correctly with HashMap structure
1434    #[test]
1435    fn test_empty_array_hashmap() {
1436        let pipeline_yaml = r#"
1437processors:
1438  - vrl:
1439      source: |
1440        .events
1441
1442transform:
1443  - field: value
1444    type: int32
1445  - field: greptime_timestamp
1446    type: timestamp, ns
1447    index: time
1448"#;
1449
1450        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1451        let (pipeline, mut schema_info, pipeline_def, pipeline_param) = setup_pipeline!(pipeline);
1452        let pipeline_ctx = PipelineContext::new(
1453            &pipeline_def,
1454            &pipeline_param,
1455            session::context::Channel::Unknown,
1456        );
1457
1458        let input_value: serde_json::Value = serde_json::from_str(r#"{"events": []}"#).unwrap();
1459
1460        let payload = input_value.into();
1461        let result = pipeline
1462            .exec_mut(payload, &pipeline_ctx, &mut schema_info)
1463            .unwrap();
1464
1465        // Extract the HashMap structure
1466        let rows_by_context = result.into_transformed_hashmap().unwrap();
1467
1468        // Empty array should produce empty HashMap
1469        assert_eq!(rows_by_context.len(), 0);
1470    }
1471
1472    #[test]
1473    fn test_pipeline_detailed_index_options_roundtrip() {
1474        let pipeline_yaml = r#"
1475transform:
1476  - field: message
1477    type: string
1478    index:
1479      type: fulltext
1480      options:
1481        analyzer: Chinese
1482        case_sensitive: true
1483        backend: tantivy
1484  - field: trace_id
1485    type: int64
1486    index:
1487      type: skipping
1488      options:
1489        granularity: 2048
1490        false_positive_rate: 0.02
1491        type: BLOOM
1492  - field: ts
1493    type: timestamp, ns
1494    index: time
1495"#;
1496
1497        let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
1498        let schema = pipeline.schemas().unwrap().clone();
1499
1500        let message = schema
1501            .iter()
1502            .find(|column| column.column_name == "message")
1503            .unwrap();
1504        let trace_id = schema
1505            .iter()
1506            .find(|column| column.column_name == "trace_id")
1507            .unwrap();
1508        let message_options = message.options.clone();
1509        let trace_id_options = trace_id.options.clone();
1510
1511        let fulltext: FulltextOptions = serde_json::from_str(
1512            message
1513                .options
1514                .as_ref()
1515                .unwrap()
1516                .options
1517                .get("fulltext")
1518                .unwrap(),
1519        )
1520        .unwrap();
1521        assert!(fulltext.enable);
1522        assert_eq!(fulltext.analyzer.to_string(), "Chinese");
1523        assert!(fulltext.case_sensitive);
1524        assert_eq!(fulltext.backend.to_string(), "tantivy");
1525
1526        let skipping: SkippingIndexOptions = serde_json::from_str(
1527            trace_id
1528                .options
1529                .as_ref()
1530                .unwrap()
1531                .options
1532                .get("skipping_index")
1533                .unwrap(),
1534        )
1535        .unwrap();
1536        assert_eq!(skipping.granularity, 2048);
1537        assert_eq!(skipping.false_positive_rate(), 0.02);
1538        assert_eq!(skipping.index_type.to_string(), "BLOOM");
1539
1540        let roundtrip_schema = SchemaInfo::from_schema_list(schema)
1541            .column_schemas()
1542            .unwrap();
1543        let roundtrip_message = roundtrip_schema
1544            .iter()
1545            .find(|column| column.column_name == "message")
1546            .unwrap();
1547        let roundtrip_trace_id = roundtrip_schema
1548            .iter()
1549            .find(|column| column.column_name == "trace_id")
1550            .unwrap();
1551
1552        assert_eq!(message_options, roundtrip_message.options);
1553        assert_eq!(trace_id_options, roundtrip_trace_id.options);
1554    }
1555}