1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::helper::time_index_column_schema;
20use api::v1::{ColumnDataType, RowInsertRequest, Rows, Value};
21use common_time::timestamp::TimeUnit;
22use pipeline::{
23 ContextReq, DispatchedTo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, Pipeline, PipelineContext,
24 PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput, TransformerMode,
25 identity_pipeline, unwrap_or_continue_if_err,
26};
27use session::context::{Channel, QueryContextRef};
28use snafu::ResultExt;
29use vrl::value::Value as VrlValue;
30
31use crate::error::{CatalogSnafu, PipelineSnafu, Result};
32use crate::http::event::PipelineIngestRequest;
33use crate::metrics::{
34 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
35};
36use crate::query_handler::PipelineHandlerRef;
37
38macro_rules! push_to_map {
39 ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
40 $map.entry($key)
41 .or_insert_with(|| Vec::with_capacity($capacity))
42 .push($value);
43 };
44}
45
46pub async fn get_pipeline(
48 pipeline_def: &PipelineDefinition,
49 handler: &PipelineHandlerRef,
50 query_ctx: &QueryContextRef,
51) -> Result<Arc<Pipeline>> {
52 match pipeline_def {
53 PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
54 PipelineDefinition::ByNameAndValue((name, version)) => {
55 handler
56 .get_pipeline(name, *version, query_ctx.clone())
57 .await
58 }
59 _ => {
60 unreachable!("Never call get_pipeline on identity.")
61 }
62 }
63}
64
65pub(crate) async fn run_pipeline(
66 handler: &PipelineHandlerRef,
67 pipeline_ctx: &PipelineContext<'_>,
68 pipeline_req: PipelineIngestRequest,
69 query_ctx: &QueryContextRef,
70 is_top_level: bool,
71) -> Result<ContextReq> {
72 if pipeline_ctx.pipeline_definition.is_identity() {
73 run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
74 } else {
75 run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
76 }
77}
78
79async fn run_identity_pipeline(
80 handler: &PipelineHandlerRef,
81 pipeline_ctx: &PipelineContext<'_>,
82 pipeline_req: PipelineIngestRequest,
83 query_ctx: &QueryContextRef,
84) -> Result<ContextReq> {
85 let PipelineIngestRequest {
86 table: table_name,
87 values: data_array,
88 } = pipeline_req;
89 let table = if pipeline_ctx.channel == Channel::Prometheus {
90 None
91 } else {
92 handler
93 .get_table(&table_name, query_ctx)
94 .await
95 .context(CatalogSnafu)?
96 };
97 identity_pipeline(data_array, table, pipeline_ctx)
98 .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
99 .context(PipelineSnafu)
100}
101
102async fn run_custom_pipeline(
103 handler: &PipelineHandlerRef,
104 pipeline_ctx: &PipelineContext<'_>,
105 pipeline_req: PipelineIngestRequest,
106 query_ctx: &QueryContextRef,
107 is_top_level: bool,
108) -> Result<ContextReq> {
109 let skip_error = pipeline_ctx.pipeline_param.skip_error();
110 let db = query_ctx.get_db_string();
111 let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
112
113 let transform_timer = std::time::Instant::now();
114
115 let PipelineIngestRequest {
116 table: table_name,
117 values: pipeline_maps,
118 } = pipeline_req;
119 let arr_len = pipeline_maps.len();
120 let mut transformed_map = HashMap::new();
121 let mut dispatched: BTreeMap<DispatchedTo, Vec<VrlValue>> = BTreeMap::new();
122
123 let mut schema_info = match pipeline.transformer() {
124 TransformerMode::GreptimeTransformer(greptime_transformer) => {
125 SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
126 }
127 TransformerMode::AutoTransform(ts_name, timeunit) => {
128 let timeunit = match timeunit {
129 TimeUnit::Second => ColumnDataType::TimestampSecond,
130 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
131 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
132 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
133 };
134
135 let mut schema_info = SchemaInfo::default();
136 schema_info
137 .schema
138 .push(time_index_column_schema(ts_name, timeunit).into());
139
140 schema_info
141 }
142 };
143
144 let table = handler
145 .get_table(&table_name, query_ctx)
146 .await
147 .context(CatalogSnafu)?;
148 schema_info.set_table(table);
149
150 for pipeline_map in pipeline_maps {
151 let result = pipeline
152 .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
153 .inspect_err(|_| {
154 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
155 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
156 .observe(transform_timer.elapsed().as_secs_f64());
157 })
158 .context(PipelineSnafu);
159
160 let r = unwrap_or_continue_if_err!(result, skip_error);
161 match r {
162 PipelineExecOutput::Transformed(TransformedOutput { rows_by_context }) => {
163 for (opt, rows_with_suffix) in rows_by_context {
165 for (row, table_suffix) in rows_with_suffix {
167 let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
168 transformed_map
169 .entry((opt.clone(), act_table_name))
170 .or_insert_with(|| Vec::with_capacity(arr_len))
171 .push(row);
172 }
173 }
174 }
175 PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
176 push_to_map!(dispatched, dispatched_to, val, arr_len);
177 }
178 PipelineExecOutput::Filtered => {
179 continue;
180 }
181 }
182 }
183
184 let mut results = ContextReq::default();
185
186 let column_count = schema_info.schema.len();
189 for ((opt, table_name), mut rows) in transformed_map {
190 for row in &mut rows {
192 let diff = column_count.saturating_sub(row.values.len());
193 for _ in 0..diff {
194 row.values.push(Value { value_data: None });
195 }
196 }
197
198 results.add_row(
199 &opt,
200 RowInsertRequest {
201 rows: Some(Rows {
202 rows,
203 schema: schema_info.column_schemas()?,
204 }),
205 table_name: table_name.clone(),
206 },
207 );
208 }
209
210 for (dispatched_to, coll) in dispatched {
213 let table_name = dispatched_to.dispatched_to_table_name(&table_name);
216 let next_pipeline_name = dispatched_to
217 .pipeline
218 .as_deref()
219 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
220
221 let next_pipeline_def =
223 PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
224 let next_pipeline_ctx = PipelineContext::new(
225 &next_pipeline_def,
226 pipeline_ctx.pipeline_param,
227 pipeline_ctx.channel,
228 );
229 let requests = Box::pin(run_pipeline(
230 handler,
231 &next_pipeline_ctx,
232 PipelineIngestRequest {
233 table: table_name,
234 values: coll,
235 },
236 query_ctx,
237 false,
238 ))
239 .await?;
240
241 results.merge(requests);
242 }
243
244 if is_top_level {
245 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
246 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
247 .observe(transform_timer.elapsed().as_secs_f64());
248 }
249
250 Ok(results)
251}
252
253#[inline]
254fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
255 match table_suffix {
256 Some(suffix) => format!("{}{}", table_name, suffix),
257 None => table_name.clone(),
258 }
259}