1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, Rows};
20use itertools::Itertools;
21use pipeline::error::AutoTransformOneTimestampSnafu;
22use pipeline::{
23 AutoTransformOutput, ContextReq, DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext,
24 PipelineDefinition, PipelineExecOutput, PipelineMap, TransformedOutput,
25 GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
26};
27use session::context::{Channel, QueryContextRef};
28use snafu::{OptionExt, ResultExt};
29
30use crate::error::{CatalogSnafu, PipelineSnafu, Result};
31use crate::http::event::PipelineIngestRequest;
32use crate::metrics::{
33 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
34};
35use crate::query_handler::PipelineHandlerRef;
36
37macro_rules! push_to_map {
38 ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
39 $map.entry($key)
40 .or_insert_with(|| Vec::with_capacity($capacity))
41 .push($value);
42 };
43}
44
45pub async fn get_pipeline(
47 pipeline_def: &PipelineDefinition,
48 handler: &PipelineHandlerRef,
49 query_ctx: &QueryContextRef,
50) -> Result<Arc<Pipeline>> {
51 match pipeline_def {
52 PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
53 PipelineDefinition::ByNameAndValue((name, version)) => {
54 handler
55 .get_pipeline(name, *version, query_ctx.clone())
56 .await
57 }
58 _ => {
59 unreachable!("Never call get_pipeline on identity.")
60 }
61 }
62}
63
64pub(crate) async fn run_pipeline(
65 handler: &PipelineHandlerRef,
66 pipeline_ctx: &PipelineContext<'_>,
67 pipeline_req: PipelineIngestRequest,
68 query_ctx: &QueryContextRef,
69 is_top_level: bool,
70) -> Result<ContextReq> {
71 if pipeline_ctx.pipeline_definition.is_identity() {
72 run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
73 } else {
74 run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
75 }
76}
77
78async fn run_identity_pipeline(
79 handler: &PipelineHandlerRef,
80 pipeline_ctx: &PipelineContext<'_>,
81 pipeline_req: PipelineIngestRequest,
82 query_ctx: &QueryContextRef,
83) -> Result<ContextReq> {
84 let PipelineIngestRequest {
85 table: table_name,
86 values: data_array,
87 } = pipeline_req;
88 let table = if pipeline_ctx.channel == Channel::Prometheus {
89 None
90 } else {
91 handler
92 .get_table(&table_name, query_ctx)
93 .await
94 .context(CatalogSnafu)?
95 };
96 pipeline::identity_pipeline(data_array, table, pipeline_ctx)
97 .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
98 .context(PipelineSnafu)
99}
100
101async fn run_custom_pipeline(
102 handler: &PipelineHandlerRef,
103 pipeline_ctx: &PipelineContext<'_>,
104 pipeline_req: PipelineIngestRequest,
105 query_ctx: &QueryContextRef,
106 is_top_level: bool,
107) -> Result<ContextReq> {
108 let db = query_ctx.get_db_string();
109 let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
110
111 let transform_timer = std::time::Instant::now();
112
113 let PipelineIngestRequest {
114 table: table_name,
115 values: pipeline_maps,
116 } = pipeline_req;
117 let arr_len = pipeline_maps.len();
118 let mut transformed_map = HashMap::new();
119 let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
120 let mut auto_map = HashMap::new();
121 let mut auto_map_ts_keys = HashMap::new();
122
123 for pipeline_map in pipeline_maps {
124 let r = pipeline
125 .exec_mut(pipeline_map)
126 .inspect_err(|_| {
127 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
128 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
129 .observe(transform_timer.elapsed().as_secs_f64());
130 })
131 .context(PipelineSnafu)?;
132
133 match r {
134 PipelineExecOutput::Transformed(TransformedOutput {
135 opt,
136 row,
137 table_suffix,
138 pipeline_map: _val,
139 }) => {
140 let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
141 push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
142 }
143 PipelineExecOutput::AutoTransform(AutoTransformOutput {
144 table_suffix,
145 ts_unit_map,
146 pipeline_map,
147 }) => {
148 let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
149 push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
150 auto_map_ts_keys
151 .entry(act_table_name)
152 .or_insert_with(HashMap::new)
153 .extend(ts_unit_map);
154 }
155 PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
156 push_to_map!(dispatched, dispatched_to, val, arr_len);
157 }
158 }
159 }
160
161 let mut results = ContextReq::default();
162
163 if let Some(s) = pipeline.schemas() {
164 for ((opt, table_name), rows) in transformed_map {
170 results.add_rows(
171 opt,
172 RowInsertRequest {
173 rows: Some(Rows {
174 rows,
175 schema: s.clone(),
176 }),
177 table_name,
178 },
179 );
180 }
181 } else {
182 for (table_name, pipeline_maps) in auto_map {
184 if pipeline_maps.is_empty() {
185 continue;
186 }
187
188 let ts_unit_map = auto_map_ts_keys
189 .remove(&table_name)
190 .context(AutoTransformOneTimestampSnafu)
191 .context(PipelineSnafu)?;
192 let (ts_key, unit) = ts_unit_map
195 .into_iter()
196 .exactly_one()
197 .map_err(|_| AutoTransformOneTimestampSnafu.build())
198 .context(PipelineSnafu)?;
199
200 let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false);
201 let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index));
202 let next_pipeline_ctx =
203 PipelineContext::new(&new_def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
204
205 let reqs = run_identity_pipeline(
206 handler,
207 &next_pipeline_ctx,
208 PipelineIngestRequest {
209 table: table_name,
210 values: pipeline_maps,
211 },
212 query_ctx,
213 )
214 .await?;
215
216 results.merge(reqs);
217 }
218 }
219
220 for (dispatched_to, coll) in dispatched {
223 let table_name = dispatched_to.dispatched_to_table_name(&table_name);
226 let next_pipeline_name = dispatched_to
227 .pipeline
228 .as_deref()
229 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
230
231 let next_pipeline_def =
233 PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
234 let next_pipeline_ctx = PipelineContext::new(
235 &next_pipeline_def,
236 pipeline_ctx.pipeline_param,
237 pipeline_ctx.channel,
238 );
239 let requests = Box::pin(run_pipeline(
240 handler,
241 &next_pipeline_ctx,
242 PipelineIngestRequest {
243 table: table_name,
244 values: coll,
245 },
246 query_ctx,
247 false,
248 ))
249 .await?;
250
251 results.merge(requests);
252 }
253
254 if is_top_level {
255 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
256 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
257 .observe(transform_timer.elapsed().as_secs_f64());
258 }
259
260 Ok(results)
261}
262
263#[inline]
264fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
265 match table_suffix {
266 Some(suffix) => format!("{}{}", table_name, suffix),
267 None => table_name.clone(),
268 }
269}