servers/
pipeline.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::helper::time_index_column_schema;
20use api::v1::{ColumnDataType, RowInsertRequest, Rows, Value};
21use common_time::timestamp::TimeUnit;
22use pipeline::{
23    ContextReq, DispatchedTo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, Pipeline, PipelineContext,
24    PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput, TransformerMode,
25    identity_pipeline, unwrap_or_continue_if_err,
26};
27use session::context::{Channel, QueryContextRef};
28use snafu::ResultExt;
29use vrl::value::Value as VrlValue;
30
31use crate::error::{CatalogSnafu, PipelineSnafu, Result};
32use crate::http::event::PipelineIngestRequest;
33use crate::metrics::{
34    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
35};
36use crate::query_handler::PipelineHandlerRef;
37
38macro_rules! push_to_map {
39    ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
40        $map.entry($key)
41            .or_insert_with(|| Vec::with_capacity($capacity))
42            .push($value);
43    };
44}
45
46/// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline
47pub async fn get_pipeline(
48    pipeline_def: &PipelineDefinition,
49    handler: &PipelineHandlerRef,
50    query_ctx: &QueryContextRef,
51) -> Result<Arc<Pipeline>> {
52    match pipeline_def {
53        PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
54        PipelineDefinition::ByNameAndValue((name, version)) => {
55            handler
56                .get_pipeline(name, *version, query_ctx.clone())
57                .await
58        }
59        _ => {
60            unreachable!("Never call get_pipeline on identity.")
61        }
62    }
63}
64
65pub(crate) async fn run_pipeline(
66    handler: &PipelineHandlerRef,
67    pipeline_ctx: &PipelineContext<'_>,
68    pipeline_req: PipelineIngestRequest,
69    query_ctx: &QueryContextRef,
70    is_top_level: bool,
71) -> Result<ContextReq> {
72    if pipeline_ctx.pipeline_definition.is_identity() {
73        run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
74    } else {
75        run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
76    }
77}
78
79async fn run_identity_pipeline(
80    handler: &PipelineHandlerRef,
81    pipeline_ctx: &PipelineContext<'_>,
82    pipeline_req: PipelineIngestRequest,
83    query_ctx: &QueryContextRef,
84) -> Result<ContextReq> {
85    let PipelineIngestRequest {
86        table: table_name,
87        values: data_array,
88    } = pipeline_req;
89    let table = if pipeline_ctx.channel == Channel::Prometheus {
90        None
91    } else {
92        handler
93            .get_table(&table_name, query_ctx)
94            .await
95            .context(CatalogSnafu)?
96    };
97    identity_pipeline(data_array, table, pipeline_ctx)
98        .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
99        .context(PipelineSnafu)
100}
101
102async fn run_custom_pipeline(
103    handler: &PipelineHandlerRef,
104    pipeline_ctx: &PipelineContext<'_>,
105    pipeline_req: PipelineIngestRequest,
106    query_ctx: &QueryContextRef,
107    is_top_level: bool,
108) -> Result<ContextReq> {
109    let skip_error = pipeline_ctx.pipeline_param.skip_error();
110    let db = query_ctx.get_db_string();
111    let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
112
113    let transform_timer = std::time::Instant::now();
114
115    let PipelineIngestRequest {
116        table: table_name,
117        values: pipeline_maps,
118    } = pipeline_req;
119    let arr_len = pipeline_maps.len();
120    let mut transformed_map = HashMap::new();
121    let mut dispatched: BTreeMap<DispatchedTo, Vec<VrlValue>> = BTreeMap::new();
122
123    let mut schema_info = match pipeline.transformer() {
124        TransformerMode::GreptimeTransformer(greptime_transformer) => {
125            SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
126        }
127        TransformerMode::AutoTransform(ts_name, timeunit) => {
128            let timeunit = match timeunit {
129                TimeUnit::Second => ColumnDataType::TimestampSecond,
130                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
131                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
132                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
133            };
134
135            let mut schema_info = SchemaInfo::default();
136            schema_info
137                .schema
138                .push(time_index_column_schema(ts_name, timeunit).into());
139
140            schema_info
141        }
142    };
143
144    let table = handler
145        .get_table(&table_name, query_ctx)
146        .await
147        .context(CatalogSnafu)?;
148    schema_info.set_table(table);
149
150    for pipeline_map in pipeline_maps {
151        let result = pipeline
152            .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
153            .inspect_err(|_| {
154                METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
155                    .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
156                    .observe(transform_timer.elapsed().as_secs_f64());
157            })
158            .context(PipelineSnafu);
159
160        let r = unwrap_or_continue_if_err!(result, skip_error);
161        match r {
162            PipelineExecOutput::Transformed(TransformedOutput { rows_by_context }) => {
163                // Process each ContextOpt group separately
164                for (opt, rows_with_suffix) in rows_by_context {
165                    // Group rows by table name within each context
166                    for (row, table_suffix) in rows_with_suffix {
167                        let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
168                        transformed_map
169                            .entry((opt.clone(), act_table_name))
170                            .or_insert_with(|| Vec::with_capacity(arr_len))
171                            .push(row);
172                    }
173                }
174            }
175            PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
176                push_to_map!(dispatched, dispatched_to, val, arr_len);
177            }
178            PipelineExecOutput::Filtered => {
179                continue;
180            }
181        }
182    }
183
184    let mut results = ContextReq::default();
185
186    // Process transformed outputs. Each entry in transformed_map contains
187    // Vec<Row> grouped by (opt, table_name).
188    let column_count = schema_info.schema.len();
189    for ((opt, table_name), mut rows) in transformed_map {
190        // Pad rows to match final schema size (schema may have evolved during processing)
191        for row in &mut rows {
192            let diff = column_count.saturating_sub(row.values.len());
193            for _ in 0..diff {
194                row.values.push(Value { value_data: None });
195            }
196        }
197
198        results.add_row(
199            &opt,
200            RowInsertRequest {
201                rows: Some(Rows {
202                    rows,
203                    schema: schema_info.column_schemas()?,
204                }),
205                table_name: table_name.clone(),
206            },
207        );
208    }
209
210    // if current pipeline contains dispatcher and has several rules, we may
211    // already accumulated several dispatched rules and rows.
212    for (dispatched_to, coll) in dispatched {
213        // we generate the new table name according to `table_part` and
214        // current custom table name.
215        let table_name = dispatched_to.dispatched_to_table_name(&table_name);
216        let next_pipeline_name = dispatched_to
217            .pipeline
218            .as_deref()
219            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
220
221        // run pipeline recursively.
222        let next_pipeline_def =
223            PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
224        let next_pipeline_ctx = PipelineContext::new(
225            &next_pipeline_def,
226            pipeline_ctx.pipeline_param,
227            pipeline_ctx.channel,
228        );
229        let requests = Box::pin(run_pipeline(
230            handler,
231            &next_pipeline_ctx,
232            PipelineIngestRequest {
233                table: table_name,
234                values: coll,
235            },
236            query_ctx,
237            false,
238        ))
239        .await?;
240
241        results.merge(requests);
242    }
243
244    if is_top_level {
245        METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
246            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
247            .observe(transform_timer.elapsed().as_secs_f64());
248    }
249
250    Ok(results)
251}
252
253#[inline]
254fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
255    match table_suffix {
256        Some(suffix) => format!("{}{}", table_name, suffix),
257        None => table_name.clone(),
258    }
259}