1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::greptime_proto;
20use api::v1::helper::time_index_column_schema;
21use api::v1::{ColumnDataType, RowInsertRequest, Rows};
22use common_time::timestamp::TimeUnit;
23use pipeline::{
24 identity_pipeline, unwrap_or_continue_if_err, ContextReq, DispatchedTo, Pipeline,
25 PipelineContext, PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput,
26 TransformerMode, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
27};
28use session::context::{Channel, QueryContextRef};
29use snafu::ResultExt;
30use vrl::value::Value as VrlValue;
31
32use crate::error::{CatalogSnafu, PipelineSnafu, Result};
33use crate::http::event::PipelineIngestRequest;
34use crate::metrics::{
35 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
36};
37use crate::query_handler::PipelineHandlerRef;
38
39macro_rules! push_to_map {
40 ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
41 $map.entry($key)
42 .or_insert_with(|| Vec::with_capacity($capacity))
43 .push($value);
44 };
45}
46
47pub async fn get_pipeline(
49 pipeline_def: &PipelineDefinition,
50 handler: &PipelineHandlerRef,
51 query_ctx: &QueryContextRef,
52) -> Result<Arc<Pipeline>> {
53 match pipeline_def {
54 PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
55 PipelineDefinition::ByNameAndValue((name, version)) => {
56 handler
57 .get_pipeline(name, *version, query_ctx.clone())
58 .await
59 }
60 _ => {
61 unreachable!("Never call get_pipeline on identity.")
62 }
63 }
64}
65
66pub(crate) async fn run_pipeline(
67 handler: &PipelineHandlerRef,
68 pipeline_ctx: &PipelineContext<'_>,
69 pipeline_req: PipelineIngestRequest,
70 query_ctx: &QueryContextRef,
71 is_top_level: bool,
72) -> Result<ContextReq> {
73 if pipeline_ctx.pipeline_definition.is_identity() {
74 run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
75 } else {
76 run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
77 }
78}
79
80async fn run_identity_pipeline(
81 handler: &PipelineHandlerRef,
82 pipeline_ctx: &PipelineContext<'_>,
83 pipeline_req: PipelineIngestRequest,
84 query_ctx: &QueryContextRef,
85) -> Result<ContextReq> {
86 let PipelineIngestRequest {
87 table: table_name,
88 values: data_array,
89 } = pipeline_req;
90 let table = if pipeline_ctx.channel == Channel::Prometheus {
91 None
92 } else {
93 handler
94 .get_table(&table_name, query_ctx)
95 .await
96 .context(CatalogSnafu)?
97 };
98 identity_pipeline(data_array, table, pipeline_ctx)
99 .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
100 .context(PipelineSnafu)
101}
102
103async fn run_custom_pipeline(
104 handler: &PipelineHandlerRef,
105 pipeline_ctx: &PipelineContext<'_>,
106 pipeline_req: PipelineIngestRequest,
107 query_ctx: &QueryContextRef,
108 is_top_level: bool,
109) -> Result<ContextReq> {
110 let skip_error = pipeline_ctx.pipeline_param.skip_error();
111 let db = query_ctx.get_db_string();
112 let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
113
114 let transform_timer = std::time::Instant::now();
115
116 let PipelineIngestRequest {
117 table: table_name,
118 values: pipeline_maps,
119 } = pipeline_req;
120 let arr_len = pipeline_maps.len();
121 let mut transformed_map = HashMap::new();
122 let mut dispatched: BTreeMap<DispatchedTo, Vec<VrlValue>> = BTreeMap::new();
123
124 let mut schema_info = match pipeline.transformer() {
125 TransformerMode::GreptimeTransformer(greptime_transformer) => {
126 SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
127 }
128 TransformerMode::AutoTransform(ts_name, timeunit) => {
129 let timeunit = match timeunit {
130 TimeUnit::Second => ColumnDataType::TimestampSecond,
131 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
132 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
133 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
134 };
135
136 let mut schema_info = SchemaInfo::default();
137 schema_info
138 .schema
139 .push(time_index_column_schema(ts_name, timeunit));
140
141 schema_info
142 }
143 };
144
145 for pipeline_map in pipeline_maps {
146 let result = pipeline
147 .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
148 .inspect_err(|_| {
149 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
150 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
151 .observe(transform_timer.elapsed().as_secs_f64());
152 })
153 .context(PipelineSnafu);
154
155 let r = unwrap_or_continue_if_err!(result, skip_error);
156 match r {
157 PipelineExecOutput::Transformed(TransformedOutput {
158 opt,
159 row,
160 table_suffix,
161 }) => {
162 let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
163 push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
164 }
165 PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
166 push_to_map!(dispatched, dispatched_to, val, arr_len);
167 }
168 PipelineExecOutput::Filtered => {
169 continue;
170 }
171 }
172 }
173
174 let mut results = ContextReq::default();
175
176 let s_len = schema_info.schema.len();
177
178 for ((opt, table_name), mut rows) in transformed_map {
180 for row in rows.iter_mut() {
181 row.values
182 .resize(s_len, greptime_proto::v1::Value::default());
183 }
184 results.add_row(
185 opt,
186 RowInsertRequest {
187 rows: Some(Rows {
188 rows,
189 schema: schema_info.schema.clone(),
190 }),
191 table_name,
192 },
193 );
194 }
195
196 for (dispatched_to, coll) in dispatched {
199 let table_name = dispatched_to.dispatched_to_table_name(&table_name);
202 let next_pipeline_name = dispatched_to
203 .pipeline
204 .as_deref()
205 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
206
207 let next_pipeline_def =
209 PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
210 let next_pipeline_ctx = PipelineContext::new(
211 &next_pipeline_def,
212 pipeline_ctx.pipeline_param,
213 pipeline_ctx.channel,
214 );
215 let requests = Box::pin(run_pipeline(
216 handler,
217 &next_pipeline_ctx,
218 PipelineIngestRequest {
219 table: table_name,
220 values: coll,
221 },
222 query_ctx,
223 false,
224 ))
225 .await?;
226
227 results.merge(requests);
228 }
229
230 if is_top_level {
231 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
232 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
233 .observe(transform_timer.elapsed().as_secs_f64());
234 }
235
236 Ok(results)
237}
238
239#[inline]
240fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
241 match table_suffix {
242 Some(suffix) => format!("{}{}", table_name, suffix),
243 None => table_name.clone(),
244 }
245}