servers/
pipeline.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::greptime_proto;
20use api::v1::helper::time_index_column_schema;
21use api::v1::{ColumnDataType, RowInsertRequest, Rows};
22use common_time::timestamp::TimeUnit;
23use pipeline::{
24    identity_pipeline, unwrap_or_continue_if_err, ContextReq, DispatchedTo, Pipeline,
25    PipelineContext, PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput,
26    TransformerMode, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
27};
28use session::context::{Channel, QueryContextRef};
29use snafu::ResultExt;
30use vrl::value::Value as VrlValue;
31
32use crate::error::{CatalogSnafu, PipelineSnafu, Result};
33use crate::http::event::PipelineIngestRequest;
34use crate::metrics::{
35    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
36};
37use crate::query_handler::PipelineHandlerRef;
38
39macro_rules! push_to_map {
40    ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
41        $map.entry($key)
42            .or_insert_with(|| Vec::with_capacity($capacity))
43            .push($value);
44    };
45}
46
47/// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline
48pub async fn get_pipeline(
49    pipeline_def: &PipelineDefinition,
50    handler: &PipelineHandlerRef,
51    query_ctx: &QueryContextRef,
52) -> Result<Arc<Pipeline>> {
53    match pipeline_def {
54        PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
55        PipelineDefinition::ByNameAndValue((name, version)) => {
56            handler
57                .get_pipeline(name, *version, query_ctx.clone())
58                .await
59        }
60        _ => {
61            unreachable!("Never call get_pipeline on identity.")
62        }
63    }
64}
65
66pub(crate) async fn run_pipeline(
67    handler: &PipelineHandlerRef,
68    pipeline_ctx: &PipelineContext<'_>,
69    pipeline_req: PipelineIngestRequest,
70    query_ctx: &QueryContextRef,
71    is_top_level: bool,
72) -> Result<ContextReq> {
73    if pipeline_ctx.pipeline_definition.is_identity() {
74        run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
75    } else {
76        run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
77    }
78}
79
80async fn run_identity_pipeline(
81    handler: &PipelineHandlerRef,
82    pipeline_ctx: &PipelineContext<'_>,
83    pipeline_req: PipelineIngestRequest,
84    query_ctx: &QueryContextRef,
85) -> Result<ContextReq> {
86    let PipelineIngestRequest {
87        table: table_name,
88        values: data_array,
89    } = pipeline_req;
90    let table = if pipeline_ctx.channel == Channel::Prometheus {
91        None
92    } else {
93        handler
94            .get_table(&table_name, query_ctx)
95            .await
96            .context(CatalogSnafu)?
97    };
98    identity_pipeline(data_array, table, pipeline_ctx)
99        .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
100        .context(PipelineSnafu)
101}
102
103async fn run_custom_pipeline(
104    handler: &PipelineHandlerRef,
105    pipeline_ctx: &PipelineContext<'_>,
106    pipeline_req: PipelineIngestRequest,
107    query_ctx: &QueryContextRef,
108    is_top_level: bool,
109) -> Result<ContextReq> {
110    let skip_error = pipeline_ctx.pipeline_param.skip_error();
111    let db = query_ctx.get_db_string();
112    let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
113
114    let transform_timer = std::time::Instant::now();
115
116    let PipelineIngestRequest {
117        table: table_name,
118        values: pipeline_maps,
119    } = pipeline_req;
120    let arr_len = pipeline_maps.len();
121    let mut transformed_map = HashMap::new();
122    let mut dispatched: BTreeMap<DispatchedTo, Vec<VrlValue>> = BTreeMap::new();
123
124    let mut schema_info = match pipeline.transformer() {
125        TransformerMode::GreptimeTransformer(greptime_transformer) => {
126            SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
127        }
128        TransformerMode::AutoTransform(ts_name, timeunit) => {
129            let timeunit = match timeunit {
130                TimeUnit::Second => ColumnDataType::TimestampSecond,
131                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
132                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
133                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
134            };
135
136            let mut schema_info = SchemaInfo::default();
137            schema_info
138                .schema
139                .push(time_index_column_schema(ts_name, timeunit));
140
141            schema_info
142        }
143    };
144
145    for pipeline_map in pipeline_maps {
146        let result = pipeline
147            .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
148            .inspect_err(|_| {
149                METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
150                    .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
151                    .observe(transform_timer.elapsed().as_secs_f64());
152            })
153            .context(PipelineSnafu);
154
155        let r = unwrap_or_continue_if_err!(result, skip_error);
156        match r {
157            PipelineExecOutput::Transformed(TransformedOutput {
158                opt,
159                row,
160                table_suffix,
161            }) => {
162                let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
163                push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
164            }
165            PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
166                push_to_map!(dispatched, dispatched_to, val, arr_len);
167            }
168            PipelineExecOutput::Filtered => {
169                continue;
170            }
171        }
172    }
173
174    let mut results = ContextReq::default();
175
176    let s_len = schema_info.schema.len();
177
178    // if transformed
179    for ((opt, table_name), mut rows) in transformed_map {
180        for row in rows.iter_mut() {
181            row.values
182                .resize(s_len, greptime_proto::v1::Value::default());
183        }
184        results.add_row(
185            opt,
186            RowInsertRequest {
187                rows: Some(Rows {
188                    rows,
189                    schema: schema_info.schema.clone(),
190                }),
191                table_name,
192            },
193        );
194    }
195
196    // if current pipeline contains dispatcher and has several rules, we may
197    // already accumulated several dispatched rules and rows.
198    for (dispatched_to, coll) in dispatched {
199        // we generate the new table name according to `table_part` and
200        // current custom table name.
201        let table_name = dispatched_to.dispatched_to_table_name(&table_name);
202        let next_pipeline_name = dispatched_to
203            .pipeline
204            .as_deref()
205            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
206
207        // run pipeline recursively.
208        let next_pipeline_def =
209            PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
210        let next_pipeline_ctx = PipelineContext::new(
211            &next_pipeline_def,
212            pipeline_ctx.pipeline_param,
213            pipeline_ctx.channel,
214        );
215        let requests = Box::pin(run_pipeline(
216            handler,
217            &next_pipeline_ctx,
218            PipelineIngestRequest {
219                table: table_name,
220                values: coll,
221            },
222            query_ctx,
223            false,
224        ))
225        .await?;
226
227        results.merge(requests);
228    }
229
230    if is_top_level {
231        METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
232            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
233            .observe(transform_timer.elapsed().as_secs_f64());
234    }
235
236    Ok(results)
237}
238
239#[inline]
240fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
241    match table_suffix {
242        Some(suffix) => format!("{}{}", table_name, suffix),
243        None => table_name.clone(),
244    }
245}