servers/
pipeline.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, Rows};
20use itertools::Itertools;
21use pipeline::error::AutoTransformOneTimestampSnafu;
22use pipeline::{
23    AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext,
24    PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput,
25    GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
26};
27use session::context::{Channel, QueryContextRef};
28use snafu::{OptionExt, ResultExt};
29
30use crate::error::{CatalogSnafu, PipelineSnafu, Result};
31use crate::http::event::PipelineIngestRequest;
32use crate::metrics::{
33    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
34};
35use crate::query_handler::PipelineHandlerRef;
36
37macro_rules! push_to_map {
38    ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
39        $map.entry($key)
40            .or_insert_with(|| Vec::with_capacity($capacity))
41            .push($value);
42    };
43}
44
45/// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline
46pub async fn get_pipeline(
47    pipeline_def: &PipelineDefinition,
48    handler: &PipelineHandlerRef,
49    query_ctx: &QueryContextRef,
50) -> Result<Arc<Pipeline>> {
51    match pipeline_def {
52        PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
53        PipelineDefinition::ByNameAndValue((name, version)) => {
54            handler
55                .get_pipeline(name, *version, query_ctx.clone())
56                .await
57        }
58        _ => {
59            unreachable!("Never call get_pipeline on identity.")
60        }
61    }
62}
63
64pub(crate) async fn run_pipeline(
65    handler: &PipelineHandlerRef,
66    pipeline_ctx: &PipelineContext<'_>,
67    pipeline_req: PipelineIngestRequest,
68    query_ctx: &QueryContextRef,
69    is_top_level: bool,
70) -> Result<ContextReq> {
71    if pipeline_ctx.pipeline_definition.is_identity() {
72        run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
73    } else {
74        run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
75    }
76}
77
78async fn run_identity_pipeline(
79    handler: &PipelineHandlerRef,
80    pipeline_ctx: &PipelineContext<'_>,
81    pipeline_req: PipelineIngestRequest,
82    query_ctx: &QueryContextRef,
83) -> Result<ContextReq> {
84    let PipelineIngestRequest {
85        table: table_name,
86        values: data_array,
87    } = pipeline_req;
88    let table = if pipeline_ctx.channel == Channel::Prometheus {
89        None
90    } else {
91        handler
92            .get_table(&table_name, query_ctx)
93            .await
94            .context(CatalogSnafu)?
95    };
96    pipeline::identity_pipeline(data_array, table, pipeline_ctx)
97        .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
98        .context(PipelineSnafu)
99}
100
101async fn run_custom_pipeline(
102    handler: &PipelineHandlerRef,
103    pipeline_ctx: &PipelineContext<'_>,
104    pipeline_req: PipelineIngestRequest,
105    query_ctx: &QueryContextRef,
106    is_top_level: bool,
107) -> Result<ContextReq> {
108    let db = query_ctx.get_db_string();
109    let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
110
111    let transform_timer = std::time::Instant::now();
112
113    let PipelineIngestRequest {
114        table: table_name,
115        values: pipeline_maps,
116    } = pipeline_req;
117    let arr_len = pipeline_maps.len();
118    let mut transformed_map = HashMap::new();
119    let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
120    let mut auto_map = HashMap::new();
121    let mut auto_map_ts_keys = HashMap::new();
122
123    for pipeline_map in pipeline_maps {
124        let r = pipeline
125            .exec_mut(pipeline_map)
126            .inspect_err(|_| {
127                METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
128                    .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
129                    .observe(transform_timer.elapsed().as_secs_f64());
130            })
131            .context(PipelineSnafu)?;
132
133        match r {
134            PipelineExecOutput::Transformed(TransformedOutput {
135                opt,
136                row,
137                table_suffix,
138                pipeline_map: _val,
139            }) => {
140                let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
141                push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
142            }
143            PipelineExecOutput::AutoTransform(AutoTransformOutput {
144                table_suffix,
145                ts_unit_map,
146                pipeline_map,
147            }) => {
148                let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
149                push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
150                auto_map_ts_keys
151                    .entry(act_table_name)
152                    .or_insert_with(HashMap::new)
153                    .extend(ts_unit_map);
154            }
155            PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
156                push_to_map!(dispatched, dispatched_to, val, arr_len);
157            }
158        }
159    }
160
161    let mut results = ContextReq::default();
162
163    if let Some(s) = pipeline.schemas() {
164        // transformed
165
166        // if current pipeline generates some transformed results, build it as
167        // `RowInsertRequest` and append to results. If the pipeline doesn't
168        // have dispatch, this will be only output of the pipeline.
169        for ((opt, table_name), rows) in transformed_map {
170            results.add_rows(
171                opt,
172                RowInsertRequest {
173                    rows: Some(Rows {
174                        rows,
175                        schema: s.clone(),
176                    }),
177                    table_name,
178                },
179            );
180        }
181    } else {
182        // auto map
183        for (table_name, pipeline_maps) in auto_map {
184            if pipeline_maps.is_empty() {
185                continue;
186            }
187
188            let ts_unit_map = auto_map_ts_keys
189                .remove(&table_name)
190                .context(AutoTransformOneTimestampSnafu)
191                .context(PipelineSnafu)?;
192            // only one timestamp key is allowed
193            // which will be converted to ts index
194            let (ts_key, unit) = ts_unit_map
195                .into_iter()
196                .exactly_one()
197                .map_err(|_| AutoTransformOneTimestampSnafu.build())
198                .context(PipelineSnafu)?;
199
200            let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false);
201            let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index));
202            let next_pipeline_ctx =
203                PipelineContext::new(&new_def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
204
205            let reqs = run_identity_pipeline(
206                handler,
207                &next_pipeline_ctx,
208                PipelineIngestRequest {
209                    table: table_name,
210                    values: pipeline_maps,
211                },
212                query_ctx,
213            )
214            .await?;
215
216            results.merge(reqs);
217        }
218    }
219
220    // if current pipeline contains dispatcher and has several rules, we may
221    // already accumulated several dispatched rules and rows.
222    for (dispatched_to, coll) in dispatched {
223        // we generate the new table name according to `table_part` and
224        // current custom table name.
225        let table_name = dispatched_to.dispatched_to_table_name(&table_name);
226        let next_pipeline_name = dispatched_to
227            .pipeline
228            .as_deref()
229            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
230
231        // run pipeline recursively.
232        let next_pipeline_def =
233            PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
234        let next_pipeline_ctx = PipelineContext::new(
235            &next_pipeline_def,
236            pipeline_ctx.pipeline_param,
237            pipeline_ctx.channel,
238        );
239        let requests = Box::pin(run_pipeline(
240            handler,
241            &next_pipeline_ctx,
242            PipelineIngestRequest {
243                table: table_name,
244                values: coll,
245            },
246            query_ctx,
247            false,
248        ))
249        .await?;
250
251        results.merge(requests);
252    }
253
254    if is_top_level {
255        METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
256            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
257            .observe(transform_timer.elapsed().as_secs_f64());
258    }
259
260    Ok(results)
261}
262
263#[inline]
264fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
265    match table_suffix {
266        Some(suffix) => format!("{}{}", table_name, suffix),
267        None => table_name.clone(),
268    }
269}