1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::greptime_proto;
20use api::v1::{ColumnDataType, ColumnSchema, RowInsertRequest, Rows, SemanticType};
21use common_time::timestamp::TimeUnit;
22use pipeline::{
23 unwrap_or_continue_if_err, ContextReq, DispatchedTo, Pipeline, PipelineContext,
24 PipelineDefinition, PipelineExecOutput, SchemaInfo, TransformedOutput, TransformerMode, Value,
25 GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
26};
27use session::context::{Channel, QueryContextRef};
28use snafu::ResultExt;
29
30use crate::error::{CatalogSnafu, PipelineSnafu, Result};
31use crate::http::event::PipelineIngestRequest;
32use crate::metrics::{
33 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
34};
35use crate::query_handler::PipelineHandlerRef;
36
37macro_rules! push_to_map {
38 ($map:expr, $key:expr, $value:expr, $capacity:expr) => {
39 $map.entry($key)
40 .or_insert_with(|| Vec::with_capacity($capacity))
41 .push($value);
42 };
43}
44
45pub async fn get_pipeline(
47 pipeline_def: &PipelineDefinition,
48 handler: &PipelineHandlerRef,
49 query_ctx: &QueryContextRef,
50) -> Result<Arc<Pipeline>> {
51 match pipeline_def {
52 PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
53 PipelineDefinition::ByNameAndValue((name, version)) => {
54 handler
55 .get_pipeline(name, *version, query_ctx.clone())
56 .await
57 }
58 _ => {
59 unreachable!("Never call get_pipeline on identity.")
60 }
61 }
62}
63
64pub(crate) async fn run_pipeline(
65 handler: &PipelineHandlerRef,
66 pipeline_ctx: &PipelineContext<'_>,
67 pipeline_req: PipelineIngestRequest,
68 query_ctx: &QueryContextRef,
69 is_top_level: bool,
70) -> Result<ContextReq> {
71 if pipeline_ctx.pipeline_definition.is_identity() {
72 run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
73 } else {
74 run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
75 }
76}
77
78async fn run_identity_pipeline(
79 handler: &PipelineHandlerRef,
80 pipeline_ctx: &PipelineContext<'_>,
81 pipeline_req: PipelineIngestRequest,
82 query_ctx: &QueryContextRef,
83) -> Result<ContextReq> {
84 let PipelineIngestRequest {
85 table: table_name,
86 values: data_array,
87 } = pipeline_req;
88 let table = if pipeline_ctx.channel == Channel::Prometheus {
89 None
90 } else {
91 handler
92 .get_table(&table_name, query_ctx)
93 .await
94 .context(CatalogSnafu)?
95 };
96 pipeline::identity_pipeline(data_array, table, pipeline_ctx)
97 .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
98 .context(PipelineSnafu)
99}
100
101async fn run_custom_pipeline(
102 handler: &PipelineHandlerRef,
103 pipeline_ctx: &PipelineContext<'_>,
104 pipeline_req: PipelineIngestRequest,
105 query_ctx: &QueryContextRef,
106 is_top_level: bool,
107) -> Result<ContextReq> {
108 let skip_error = pipeline_ctx.pipeline_param.skip_error();
109 let db = query_ctx.get_db_string();
110 let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
111
112 let transform_timer = std::time::Instant::now();
113
114 let PipelineIngestRequest {
115 table: table_name,
116 values: pipeline_maps,
117 } = pipeline_req;
118 let arr_len = pipeline_maps.len();
119 let mut transformed_map = HashMap::new();
120 let mut dispatched: BTreeMap<DispatchedTo, Vec<Value>> = BTreeMap::new();
121
122 let mut schema_info = match pipeline.transformer() {
123 TransformerMode::GreptimeTransformer(greptime_transformer) => {
124 SchemaInfo::from_schema_list(greptime_transformer.schemas().clone())
125 }
126 TransformerMode::AutoTransform(ts_name, timeunit) => {
127 let timeunit = match timeunit {
128 TimeUnit::Second => ColumnDataType::TimestampSecond,
129 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
130 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
131 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
132 };
133
134 let mut schema_info = SchemaInfo::default();
135 schema_info.schema.push(ColumnSchema {
136 column_name: ts_name.clone(),
137 datatype: timeunit.into(),
138 semantic_type: SemanticType::Timestamp as i32,
139 datatype_extension: None,
140 options: None,
141 });
142
143 schema_info
144 }
145 };
146
147 for pipeline_map in pipeline_maps {
148 let result = pipeline
149 .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
150 .inspect_err(|_| {
151 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
152 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
153 .observe(transform_timer.elapsed().as_secs_f64());
154 })
155 .context(PipelineSnafu);
156
157 let r = unwrap_or_continue_if_err!(result, skip_error);
158 match r {
159 PipelineExecOutput::Transformed(TransformedOutput {
160 opt,
161 row,
162 table_suffix,
163 }) => {
164 let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
165 push_to_map!(transformed_map, (opt, act_table_name), row, arr_len);
166 }
167 PipelineExecOutput::DispatchedTo(dispatched_to, val) => {
168 push_to_map!(dispatched, dispatched_to, val, arr_len);
169 }
170 }
171 }
172
173 let mut results = ContextReq::default();
174
175 let s_len = schema_info.schema.len();
176
177 for ((opt, table_name), mut rows) in transformed_map {
179 for row in rows.iter_mut() {
180 row.values
181 .resize(s_len, greptime_proto::v1::Value::default());
182 }
183 results.add_row(
184 opt,
185 RowInsertRequest {
186 rows: Some(Rows {
187 rows,
188 schema: schema_info.schema.clone(),
189 }),
190 table_name,
191 },
192 );
193 }
194
195 for (dispatched_to, coll) in dispatched {
198 let table_name = dispatched_to.dispatched_to_table_name(&table_name);
201 let next_pipeline_name = dispatched_to
202 .pipeline
203 .as_deref()
204 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
205
206 let next_pipeline_def =
208 PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
209 let next_pipeline_ctx = PipelineContext::new(
210 &next_pipeline_def,
211 pipeline_ctx.pipeline_param,
212 pipeline_ctx.channel,
213 );
214 let requests = Box::pin(run_pipeline(
215 handler,
216 &next_pipeline_ctx,
217 PipelineIngestRequest {
218 table: table_name,
219 values: coll,
220 },
221 query_ctx,
222 false,
223 ))
224 .await?;
225
226 results.merge(requests);
227 }
228
229 if is_top_level {
230 METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
231 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
232 .observe(transform_timer.elapsed().as_secs_f64());
233 }
234
235 Ok(results)
236}
237
238#[inline]
239fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
240 match table_suffix {
241 Some(suffix) => format!("{}{}", table_name, suffix),
242 None => table_name.clone(),
243 }
244}