1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_query::prelude::GREPTIME_VALUE;
23use datafusion::common::DFSchemaRef;
24use datafusion::datasource::DefaultTableSource;
25use datafusion::execution::context::SessionState;
26use datafusion::functions_aggregate::average::avg_udaf;
27use datafusion::functions_aggregate::count::count_udaf;
28use datafusion::functions_aggregate::grouping::grouping_udaf;
29use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
30use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
31use datafusion::functions_aggregate::sum::sum_udaf;
32use datafusion::functions_aggregate::variance::var_pop_udaf;
33use datafusion::functions_window::row_number::RowNumber;
34use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
35use datafusion::logical_expr::expr_rewriter::normalize_cols;
36use datafusion::logical_expr::{
37 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
38 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
39};
40use datafusion::prelude as df_prelude;
41use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
42use datafusion::scalar::ScalarValue;
43use datafusion::sql::TableReference;
44use datafusion_expr::utils::conjunction;
45use datafusion_expr::{col, lit, SortExpr};
46use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
47use datatypes::data_type::ConcreteDataType;
48use itertools::Itertools;
49use promql::extension_plan::{
50 build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
51 RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
52};
53use promql::functions::{
54 quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
55 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
56 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
57};
58use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
59use promql_parser::parser::token::TokenType;
60use promql_parser::parser::{
61 token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
62 Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
63 NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
64 VectorMatchCardinality, VectorSelector,
65};
66use snafu::{ensure, OptionExt, ResultExt};
67use store_api::metric_engine_consts::{
68 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
69};
70use table::table::adapter::DfTableProviderAdapter;
71
72use crate::promql::error::{
73 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
74 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
75 MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
76 NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
77 TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
78 UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
79 ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
80};
81
82const SPECIAL_TIME_FUNCTION: &str = "time";
84const SCALAR_FUNCTION: &str = "scalar";
86const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
88const SPECIAL_VECTOR_FUNCTION: &str = "vector";
90const LE_COLUMN_NAME: &str = "le";
92
93const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
94
95const DEFAULT_FIELD_COLUMN: &str = "value";
97
98const FIELD_COLUMN_MATCHER: &str = "__field__";
100
101const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
103const DB_COLUMN_MATCHER: &str = "__database__";
104
105const MAX_SCATTER_POINTS: i64 = 400;
107
108const INTERVAL_1H: i64 = 60 * 60 * 1000;
110
111#[derive(Default, Debug, Clone)]
112struct PromPlannerContext {
113 start: Millisecond,
115 end: Millisecond,
116 interval: Millisecond,
117 lookback_delta: Millisecond,
118
119 table_name: Option<String>,
121 time_index_column: Option<String>,
122 field_columns: Vec<String>,
123 tag_columns: Vec<String>,
124 field_column_matcher: Option<Vec<Matcher>>,
125 schema_name: Option<String>,
126 range: Option<Millisecond>,
128}
129
130impl PromPlannerContext {
131 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
132 Self {
133 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
134 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
135 interval: stmt.interval.as_millis() as _,
136 lookback_delta: stmt.lookback_delta.as_millis() as _,
137 ..Default::default()
138 }
139 }
140
141 fn reset(&mut self) {
143 self.table_name = None;
144 self.time_index_column = None;
145 self.field_columns = vec![];
146 self.tag_columns = vec![];
147 self.field_column_matcher = None;
148 self.schema_name = None;
149 self.range = None;
150 }
151
152 fn reset_table_name_and_schema(&mut self) {
154 self.table_name = Some(String::new());
155 self.schema_name = None;
156 }
157
158 fn has_le_tag(&self) -> bool {
160 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
161 }
162}
163
164pub struct PromPlanner {
165 table_provider: DfTableSourceProvider,
166 ctx: PromPlannerContext,
167}
168
169pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
171 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
172 matcher.value = unescaped_value;
173 }
174 matcher
175}
176
177impl PromPlanner {
178 pub async fn stmt_to_plan(
179 table_provider: DfTableSourceProvider,
180 stmt: &EvalStmt,
181 session_state: &SessionState,
182 ) -> Result<LogicalPlan> {
183 let mut planner = Self {
184 table_provider,
185 ctx: PromPlannerContext::from_eval_stmt(stmt),
186 };
187
188 planner.prom_expr_to_plan(&stmt.expr, session_state).await
189 }
190
191 #[async_recursion]
192 pub async fn prom_expr_to_plan(
193 &mut self,
194 prom_expr: &PromExpr,
195 session_state: &SessionState,
196 ) -> Result<LogicalPlan> {
197 let res = match prom_expr {
198 PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
199 PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
200 PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
201 PromExpr::Paren(ParenExpr { expr }) => {
202 self.prom_expr_to_plan(expr, session_state).await?
203 }
204 PromExpr::Subquery(expr) => {
205 self.prom_subquery_expr_to_plan(session_state, expr).await?
206 }
207 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
208 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
209 PromExpr::VectorSelector(selector) => {
210 self.prom_vector_selector_to_plan(selector).await?
211 }
212 PromExpr::MatrixSelector(selector) => {
213 self.prom_matrix_selector_to_plan(selector).await?
214 }
215 PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
216 PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
217 };
218 Ok(res)
219 }
220
221 async fn prom_subquery_expr_to_plan(
222 &mut self,
223 session_state: &SessionState,
224 subquery_expr: &SubqueryExpr,
225 ) -> Result<LogicalPlan> {
226 let SubqueryExpr {
227 expr, range, step, ..
228 } = subquery_expr;
229
230 let current_interval = self.ctx.interval;
231 if let Some(step) = step {
232 self.ctx.interval = step.as_millis() as _;
233 }
234 let current_start = self.ctx.start;
235 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
236 let input = self.prom_expr_to_plan(expr, session_state).await?;
237 self.ctx.interval = current_interval;
238 self.ctx.start = current_start;
239
240 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
241 let range_ms = range.as_millis() as _;
242 self.ctx.range = Some(range_ms);
243
244 let manipulate = RangeManipulate::new(
245 self.ctx.start,
246 self.ctx.end,
247 self.ctx.interval,
248 range_ms,
249 self.ctx
250 .time_index_column
251 .clone()
252 .expect("time index should be set in `setup_context`"),
253 self.ctx.field_columns.clone(),
254 input,
255 )
256 .context(DataFusionPlanningSnafu)?;
257
258 Ok(LogicalPlan::Extension(Extension {
259 node: Arc::new(manipulate),
260 }))
261 }
262
263 async fn prom_aggr_expr_to_plan(
264 &mut self,
265 session_state: &SessionState,
266 aggr_expr: &AggregateExpr,
267 ) -> Result<LogicalPlan> {
268 let AggregateExpr {
269 op,
270 expr,
271 modifier,
272 param,
273 } = aggr_expr;
274
275 let input = self.prom_expr_to_plan(expr, session_state).await?;
276 match (*op).id() {
277 token::T_TOPK | token::T_BOTTOMK => {
278 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
279 }
280 _ => {
281 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
284 let (aggr_exprs, prev_field_exprs) =
286 self.create_aggregate_exprs(*op, param, &input)?;
287
288 let builder = LogicalPlanBuilder::from(input);
290 let builder = if op.id() == token::T_COUNT_VALUES {
291 let label = Self::get_param_value_as_str(*op, param)?;
292 group_exprs.extend(prev_field_exprs.clone());
295 let project_fields = self
296 .create_field_column_exprs()?
297 .into_iter()
298 .chain(self.create_tag_column_exprs()?)
299 .chain(Some(self.create_time_index_column_expr()?))
300 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
301
302 builder
303 .aggregate(group_exprs.clone(), aggr_exprs)
304 .context(DataFusionPlanningSnafu)?
305 .project(project_fields)
306 .context(DataFusionPlanningSnafu)?
307 } else {
308 builder
309 .aggregate(group_exprs.clone(), aggr_exprs)
310 .context(DataFusionPlanningSnafu)?
311 };
312
313 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
314
315 builder
316 .sort(sort_expr)
317 .context(DataFusionPlanningSnafu)?
318 .build()
319 .context(DataFusionPlanningSnafu)
320 }
321 }
322 }
323
324 async fn prom_topk_bottomk_to_plan(
326 &mut self,
327 aggr_expr: &AggregateExpr,
328 input: LogicalPlan,
329 ) -> Result<LogicalPlan> {
330 let AggregateExpr {
331 op,
332 param,
333 modifier,
334 ..
335 } = aggr_expr;
336
337 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
338
339 let val = Self::get_param_value_as_f64(*op, param)?;
340
341 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
343
344 let rank_columns: Vec<_> = window_exprs
345 .iter()
346 .map(|expr| expr.schema_name().to_string())
347 .collect();
348
349 let filter: DfExpr = rank_columns
352 .iter()
353 .fold(None, |expr, rank| {
354 let predicate = DfExpr::BinaryExpr(BinaryExpr {
355 left: Box::new(col(rank)),
356 op: Operator::LtEq,
357 right: Box::new(lit(val)),
358 });
359
360 match expr {
361 None => Some(predicate),
362 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
363 left: Box::new(expr),
364 op: Operator::Or,
365 right: Box::new(predicate),
366 })),
367 }
368 })
369 .unwrap();
370
371 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
372
373 let mut new_group_exprs = group_exprs.clone();
374 new_group_exprs.extend(rank_columns);
376
377 let group_sort_expr = new_group_exprs
378 .into_iter()
379 .map(|expr| expr.sort(true, false));
380
381 let project_fields = self
382 .create_field_column_exprs()?
383 .into_iter()
384 .chain(self.create_tag_column_exprs()?)
385 .chain(Some(self.create_time_index_column_expr()?));
386
387 LogicalPlanBuilder::from(input)
388 .window(window_exprs)
389 .context(DataFusionPlanningSnafu)?
390 .filter(filter)
391 .context(DataFusionPlanningSnafu)?
392 .sort(group_sort_expr)
393 .context(DataFusionPlanningSnafu)?
394 .project(project_fields)
395 .context(DataFusionPlanningSnafu)?
396 .build()
397 .context(DataFusionPlanningSnafu)
398 }
399
400 async fn prom_unary_expr_to_plan(
401 &mut self,
402 session_state: &SessionState,
403 unary_expr: &UnaryExpr,
404 ) -> Result<LogicalPlan> {
405 let UnaryExpr { expr } = unary_expr;
406 let input = self.prom_expr_to_plan(expr, session_state).await?;
408 self.projection_for_each_field_column(input, |col| {
409 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
410 })
411 }
412
413 async fn prom_binary_expr_to_plan(
414 &mut self,
415 session_state: &SessionState,
416 binary_expr: &PromBinaryExpr,
417 ) -> Result<LogicalPlan> {
418 let PromBinaryExpr {
419 lhs,
420 rhs,
421 op,
422 modifier,
423 } = binary_expr;
424
425 let should_return_bool = if let Some(m) = modifier {
428 m.return_bool
429 } else {
430 false
431 };
432 let is_comparison_op = Self::is_token_a_comparison_op(*op);
433
434 match (
437 Self::try_build_literal_expr(lhs),
438 Self::try_build_literal_expr(rhs),
439 ) {
440 (Some(lhs), Some(rhs)) => {
441 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
442 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
443 self.ctx.reset_table_name_and_schema();
444 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
445 let mut field_expr = field_expr_builder(lhs, rhs)?;
446
447 if is_comparison_op && should_return_bool {
448 field_expr = DfExpr::Cast(Cast {
449 expr: Box::new(field_expr),
450 data_type: ArrowDataType::Float64,
451 });
452 }
453
454 Ok(LogicalPlan::Extension(Extension {
455 node: Arc::new(
456 EmptyMetric::new(
457 self.ctx.start,
458 self.ctx.end,
459 self.ctx.interval,
460 SPECIAL_TIME_FUNCTION.to_string(),
461 DEFAULT_FIELD_COLUMN.to_string(),
462 Some(field_expr),
463 )
464 .context(DataFusionPlanningSnafu)?,
465 ),
466 }))
467 }
468 (Some(mut expr), None) => {
470 let input = self.prom_expr_to_plan(rhs, session_state).await?;
471 if let Some(time_expr) = Self::try_build_special_time_expr(
473 lhs,
474 self.ctx.time_index_column.as_ref().unwrap(),
475 ) {
476 expr = time_expr
477 }
478 let bin_expr_builder = |col: &String| {
479 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
480 let mut binary_expr =
481 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
482
483 if is_comparison_op && should_return_bool {
484 binary_expr = DfExpr::Cast(Cast {
485 expr: Box::new(binary_expr),
486 data_type: ArrowDataType::Float64,
487 });
488 }
489 Ok(binary_expr)
490 };
491 if is_comparison_op && !should_return_bool {
492 self.filter_on_field_column(input, bin_expr_builder)
493 } else {
494 self.projection_for_each_field_column(input, bin_expr_builder)
495 }
496 }
497 (None, Some(mut expr)) => {
499 let input = self.prom_expr_to_plan(lhs, session_state).await?;
500 if let Some(time_expr) = Self::try_build_special_time_expr(
502 rhs,
503 self.ctx.time_index_column.as_ref().unwrap(),
504 ) {
505 expr = time_expr
506 }
507 let bin_expr_builder = |col: &String| {
508 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
509 let mut binary_expr =
510 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
511
512 if is_comparison_op && should_return_bool {
513 binary_expr = DfExpr::Cast(Cast {
514 expr: Box::new(binary_expr),
515 data_type: ArrowDataType::Float64,
516 });
517 }
518 Ok(binary_expr)
519 };
520 if is_comparison_op && !should_return_bool {
521 self.filter_on_field_column(input, bin_expr_builder)
522 } else {
523 self.projection_for_each_field_column(input, bin_expr_builder)
524 }
525 }
526 (None, None) => {
528 let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
529 let left_field_columns = self.ctx.field_columns.clone();
530 let left_time_index_column = self.ctx.time_index_column.clone();
531 let mut left_table_ref = self
532 .table_ref()
533 .unwrap_or_else(|_| TableReference::bare(""));
534 let left_context = self.ctx.clone();
535
536 let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
537 let right_field_columns = self.ctx.field_columns.clone();
538 let right_time_index_column = self.ctx.time_index_column.clone();
539 let mut right_table_ref = self
540 .table_ref()
541 .unwrap_or_else(|_| TableReference::bare(""));
542 let right_context = self.ctx.clone();
543
544 if Self::is_token_a_set_op(*op) {
548 return self.set_op_on_non_field_columns(
549 left_input,
550 right_input,
551 left_context,
552 right_context,
553 *op,
554 modifier,
555 );
556 }
557
558 if left_table_ref == right_table_ref {
560 left_table_ref = TableReference::bare("lhs");
562 right_table_ref = TableReference::bare("rhs");
563 if self.ctx.tag_columns.is_empty() {
569 self.ctx = left_context.clone();
570 self.ctx.table_name = Some("lhs".to_string());
571 } else {
572 self.ctx.table_name = Some("rhs".to_string());
573 }
574 }
575 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
576
577 let join_plan = self.join_on_non_field_columns(
578 left_input,
579 right_input,
580 left_table_ref.clone(),
581 right_table_ref.clone(),
582 left_time_index_column,
583 right_time_index_column,
584 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
587 modifier,
588 )?;
589 let join_plan_schema = join_plan.schema().clone();
590
591 let bin_expr_builder = |_: &String| {
592 let (left_col_name, right_col_name) = field_columns.next().unwrap();
593 let left_col = join_plan_schema
594 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
595 .context(DataFusionPlanningSnafu)?
596 .into();
597 let right_col = join_plan_schema
598 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
599 .context(DataFusionPlanningSnafu)?
600 .into();
601
602 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
603 let mut binary_expr =
604 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
605 if is_comparison_op && should_return_bool {
606 binary_expr = DfExpr::Cast(Cast {
607 expr: Box::new(binary_expr),
608 data_type: ArrowDataType::Float64,
609 });
610 }
611 Ok(binary_expr)
612 };
613 if is_comparison_op && !should_return_bool {
614 self.filter_on_field_column(join_plan, bin_expr_builder)
615 } else {
616 self.projection_for_each_field_column(join_plan, bin_expr_builder)
617 }
618 }
619 }
620 }
621
622 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
623 let NumberLiteral { val } = number_literal;
624 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
625 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
626 self.ctx.reset_table_name_and_schema();
627 let literal_expr = df_prelude::lit(*val);
628
629 let plan = LogicalPlan::Extension(Extension {
630 node: Arc::new(
631 EmptyMetric::new(
632 self.ctx.start,
633 self.ctx.end,
634 self.ctx.interval,
635 SPECIAL_TIME_FUNCTION.to_string(),
636 DEFAULT_FIELD_COLUMN.to_string(),
637 Some(literal_expr),
638 )
639 .context(DataFusionPlanningSnafu)?,
640 ),
641 });
642 Ok(plan)
643 }
644
645 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
646 let StringLiteral { val } = string_literal;
647 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
648 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
649 self.ctx.reset_table_name_and_schema();
650 let literal_expr = df_prelude::lit(val.to_string());
651
652 let plan = LogicalPlan::Extension(Extension {
653 node: Arc::new(
654 EmptyMetric::new(
655 self.ctx.start,
656 self.ctx.end,
657 self.ctx.interval,
658 SPECIAL_TIME_FUNCTION.to_string(),
659 DEFAULT_FIELD_COLUMN.to_string(),
660 Some(literal_expr),
661 )
662 .context(DataFusionPlanningSnafu)?,
663 ),
664 });
665 Ok(plan)
666 }
667
668 async fn prom_vector_selector_to_plan(
669 &mut self,
670 vector_selector: &VectorSelector,
671 ) -> Result<LogicalPlan> {
672 let VectorSelector {
673 name,
674 offset,
675 matchers,
676 at: _,
677 } = vector_selector;
678 let matchers = self.preprocess_label_matchers(matchers, name)?;
679 self.setup_context().await?;
680 let normalize = self
681 .selector_to_series_normalize_plan(offset, matchers, false)
682 .await?;
683 let manipulate = InstantManipulate::new(
684 self.ctx.start,
685 self.ctx.end,
686 self.ctx.lookback_delta,
687 self.ctx.interval,
688 self.ctx
689 .time_index_column
690 .clone()
691 .expect("time index should be set in `setup_context`"),
692 self.ctx.field_columns.first().cloned(),
693 normalize,
694 );
695 Ok(LogicalPlan::Extension(Extension {
696 node: Arc::new(manipulate),
697 }))
698 }
699
700 async fn prom_matrix_selector_to_plan(
701 &mut self,
702 matrix_selector: &MatrixSelector,
703 ) -> Result<LogicalPlan> {
704 let MatrixSelector { vs, range } = matrix_selector;
705 let VectorSelector {
706 name,
707 offset,
708 matchers,
709 ..
710 } = vs;
711 let matchers = self.preprocess_label_matchers(matchers, name)?;
712 self.setup_context().await?;
713
714 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
715 let range_ms = range.as_millis() as _;
716 self.ctx.range = Some(range_ms);
717
718 let normalize = self
719 .selector_to_series_normalize_plan(offset, matchers, true)
720 .await?;
721 let manipulate = RangeManipulate::new(
722 self.ctx.start,
723 self.ctx.end,
724 self.ctx.interval,
725 range_ms,
727 self.ctx
728 .time_index_column
729 .clone()
730 .expect("time index should be set in `setup_context`"),
731 self.ctx.field_columns.clone(),
732 normalize,
733 )
734 .context(DataFusionPlanningSnafu)?;
735
736 Ok(LogicalPlan::Extension(Extension {
737 node: Arc::new(manipulate),
738 }))
739 }
740
741 async fn prom_call_expr_to_plan(
742 &mut self,
743 session_state: &SessionState,
744 call_expr: &Call,
745 ) -> Result<LogicalPlan> {
746 let Call { func, args } = call_expr;
747 match func.name {
749 SPECIAL_HISTOGRAM_QUANTILE => {
750 return self.create_histogram_plan(args, session_state).await
751 }
752 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
753 SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
754 _ => {}
755 }
756
757 let args = self.create_function_args(&args.args)?;
759 let input = if let Some(prom_expr) = &args.input {
760 self.prom_expr_to_plan(prom_expr, session_state).await?
761 } else {
762 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
763 self.ctx.reset_table_name_and_schema();
764 self.ctx.tag_columns = vec![];
765 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
766 LogicalPlan::Extension(Extension {
767 node: Arc::new(
768 EmptyMetric::new(
769 self.ctx.start,
770 self.ctx.end,
771 self.ctx.interval,
772 SPECIAL_TIME_FUNCTION.to_string(),
773 DEFAULT_FIELD_COLUMN.to_string(),
774 None,
775 )
776 .context(DataFusionPlanningSnafu)?,
777 ),
778 })
779 };
780 let mut func_exprs =
781 self.create_function_expr(func, args.literals.clone(), session_state)?;
782 func_exprs.insert(0, self.create_time_index_column_expr()?);
783 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
784
785 let builder = LogicalPlanBuilder::from(input)
786 .project(func_exprs)
787 .context(DataFusionPlanningSnafu)?
788 .filter(self.create_empty_values_filter_expr()?)
789 .context(DataFusionPlanningSnafu)?;
790
791 let builder = match func.name {
792 "sort" => builder
793 .sort(self.create_field_columns_sort_exprs(true))
794 .context(DataFusionPlanningSnafu)?,
795 "sort_desc" => builder
796 .sort(self.create_field_columns_sort_exprs(false))
797 .context(DataFusionPlanningSnafu)?,
798 "sort_by_label" => builder
799 .sort(Self::create_sort_exprs_by_tags(
800 func.name,
801 args.literals,
802 true,
803 )?)
804 .context(DataFusionPlanningSnafu)?,
805 "sort_by_label_desc" => builder
806 .sort(Self::create_sort_exprs_by_tags(
807 func.name,
808 args.literals,
809 false,
810 )?)
811 .context(DataFusionPlanningSnafu)?,
812
813 _ => builder,
814 };
815
816 builder.build().context(DataFusionPlanningSnafu)
817 }
818
819 async fn prom_ext_expr_to_plan(
820 &mut self,
821 session_state: &SessionState,
822 ext_expr: &promql_parser::parser::ast::Extension,
823 ) -> Result<LogicalPlan> {
824 let expr = &ext_expr.expr;
826 let children = expr.children();
827 let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
828 match expr.name() {
834 "ANALYZE" => LogicalPlanBuilder::from(plan)
835 .explain(false, true)
836 .unwrap()
837 .build()
838 .context(DataFusionPlanningSnafu),
839 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
840 .explain(true, true)
841 .unwrap()
842 .build()
843 .context(DataFusionPlanningSnafu),
844 "EXPLAIN" => LogicalPlanBuilder::from(plan)
845 .explain(false, false)
846 .unwrap()
847 .build()
848 .context(DataFusionPlanningSnafu),
849 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
850 .explain(true, false)
851 .unwrap()
852 .build()
853 .context(DataFusionPlanningSnafu),
854 _ => LogicalPlanBuilder::empty(true)
855 .build()
856 .context(DataFusionPlanningSnafu),
857 }
858 }
859
860 #[allow(clippy::mutable_key_type)]
870 fn preprocess_label_matchers(
871 &mut self,
872 label_matchers: &Matchers,
873 name: &Option<String>,
874 ) -> Result<Matchers> {
875 self.ctx.reset();
876
877 let metric_name;
878 if let Some(name) = name.clone() {
879 metric_name = Some(name);
880 ensure!(
881 label_matchers.find_matchers(METRIC_NAME).is_empty(),
882 MultipleMetricMatchersSnafu
883 );
884 } else {
885 let mut matches = label_matchers.find_matchers(METRIC_NAME);
886 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
887 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
888 ensure!(
889 matches[0].op == MatchOp::Equal,
890 UnsupportedMatcherOpSnafu {
891 matcher_op: matches[0].op.to_string(),
892 matcher: METRIC_NAME
893 }
894 );
895 metric_name = matches.pop().map(|m| m.value);
896 }
897
898 self.ctx.table_name = metric_name;
899
900 let mut matchers = HashSet::new();
901 for matcher in &label_matchers.matchers {
902 if matcher.name == FIELD_COLUMN_MATCHER {
904 self.ctx
905 .field_column_matcher
906 .get_or_insert_default()
907 .push(matcher.clone());
908 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
909 ensure!(
910 matcher.op == MatchOp::Equal,
911 UnsupportedMatcherOpSnafu {
912 matcher: matcher.name.to_string(),
913 matcher_op: matcher.op.to_string(),
914 }
915 );
916 self.ctx.schema_name = Some(matcher.value.clone());
917 } else if matcher.name != METRIC_NAME {
918 let _ = matchers.insert(matcher.clone());
919 }
920 }
921
922 Ok(Matchers::new(
923 matchers.into_iter().map(normalize_matcher).collect(),
924 ))
925 }
926
927 async fn selector_to_series_normalize_plan(
928 &mut self,
929 offset: &Option<Offset>,
930 label_matchers: Matchers,
931 is_range_selector: bool,
932 ) -> Result<LogicalPlan> {
933 let table_ref = self.table_ref()?;
935 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
936 let table_schema = table_scan.schema();
937
938 let offset_duration = match offset {
940 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
941 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
942 None => 0,
943 };
944 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
945 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
946 scan_filters.push(time_index_filter);
947 }
948 table_scan = LogicalPlanBuilder::from(table_scan)
949 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
951 .build()
952 .context(DataFusionPlanningSnafu)?;
953
954 if let Some(field_matchers) = &self.ctx.field_column_matcher {
956 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
957 let mut result_set = HashSet::new();
959 let mut reverse_set = HashSet::new();
961 for matcher in field_matchers {
962 match &matcher.op {
963 MatchOp::Equal => {
964 if col_set.contains(&matcher.value) {
965 let _ = result_set.insert(matcher.value.clone());
966 } else {
967 return Err(ColumnNotFoundSnafu {
968 col: matcher.value.clone(),
969 }
970 .build());
971 }
972 }
973 MatchOp::NotEqual => {
974 if col_set.contains(&matcher.value) {
975 let _ = reverse_set.insert(matcher.value.clone());
976 } else {
977 return Err(ColumnNotFoundSnafu {
978 col: matcher.value.clone(),
979 }
980 .build());
981 }
982 }
983 MatchOp::Re(regex) => {
984 for col in &self.ctx.field_columns {
985 if regex.is_match(col) {
986 let _ = result_set.insert(col.clone());
987 }
988 }
989 }
990 MatchOp::NotRe(regex) => {
991 for col in &self.ctx.field_columns {
992 if regex.is_match(col) {
993 let _ = reverse_set.insert(col.clone());
994 }
995 }
996 }
997 }
998 }
999 if result_set.is_empty() {
1001 result_set = col_set.into_iter().cloned().collect();
1002 }
1003 for col in reverse_set {
1004 let _ = result_set.remove(&col);
1005 }
1006
1007 self.ctx.field_columns = self
1009 .ctx
1010 .field_columns
1011 .drain(..)
1012 .filter(|col| result_set.contains(col))
1013 .collect();
1014
1015 let exprs = result_set
1016 .into_iter()
1017 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1018 .chain(self.create_tag_column_exprs()?)
1019 .chain(Some(self.create_time_index_column_expr()?))
1020 .collect::<Vec<_>>();
1021
1022 table_scan = LogicalPlanBuilder::from(table_scan)
1024 .project(exprs)
1025 .context(DataFusionPlanningSnafu)?
1026 .build()
1027 .context(DataFusionPlanningSnafu)?;
1028 }
1029
1030 let sort_plan = LogicalPlanBuilder::from(table_scan)
1032 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1033 .context(DataFusionPlanningSnafu)?
1034 .build()
1035 .context(DataFusionPlanningSnafu)?;
1036
1037 let time_index_column =
1039 self.ctx
1040 .time_index_column
1041 .clone()
1042 .with_context(|| TimeIndexNotFoundSnafu {
1043 table: table_ref.to_string(),
1044 })?;
1045 let divide_plan = LogicalPlan::Extension(Extension {
1046 node: Arc::new(SeriesDivide::new(
1047 self.ctx.tag_columns.clone(),
1048 time_index_column,
1049 sort_plan,
1050 )),
1051 });
1052
1053 if !is_range_selector && offset_duration == 0 {
1055 return Ok(divide_plan);
1056 }
1057 let series_normalize = SeriesNormalize::new(
1058 offset_duration,
1059 self.ctx
1060 .time_index_column
1061 .clone()
1062 .with_context(|| TimeIndexNotFoundSnafu {
1063 table: table_ref.to_quoted_string(),
1064 })?,
1065 is_range_selector,
1066 self.ctx.tag_columns.clone(),
1067 divide_plan,
1068 );
1069 let logical_plan = LogicalPlan::Extension(Extension {
1070 node: Arc::new(series_normalize),
1071 });
1072
1073 Ok(logical_plan)
1074 }
1075
1076 fn agg_modifier_to_col(
1083 &mut self,
1084 input_schema: &DFSchemaRef,
1085 modifier: &Option<LabelModifier>,
1086 update_ctx: bool,
1087 ) -> Result<Vec<DfExpr>> {
1088 match modifier {
1089 None => {
1090 if update_ctx {
1091 self.ctx.tag_columns = vec![];
1092 }
1093 Ok(vec![self.create_time_index_column_expr()?])
1094 }
1095 Some(LabelModifier::Include(labels)) => {
1096 let mut exprs = Vec::with_capacity(labels.labels.len());
1097 for label in &labels.labels {
1098 if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1100 exprs.push(DfExpr::Column(Column::from(field.name())));
1101 }
1102 }
1103
1104 if update_ctx {
1105 self.ctx.tag_columns.clone_from(&labels.labels);
1107 }
1108
1109 exprs.push(self.create_time_index_column_expr()?);
1111
1112 Ok(exprs)
1113 }
1114 Some(LabelModifier::Exclude(labels)) => {
1115 let mut all_fields = input_schema
1116 .fields()
1117 .iter()
1118 .map(|f| f.name())
1119 .collect::<BTreeSet<_>>();
1120
1121 for label in &labels.labels {
1124 let _ = all_fields.remove(label);
1125 }
1126
1127 if let Some(time_index) = &self.ctx.time_index_column {
1129 let _ = all_fields.remove(time_index);
1130 }
1131 for value in &self.ctx.field_columns {
1132 let _ = all_fields.remove(value);
1133 }
1134
1135 if update_ctx {
1136 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1138 }
1139
1140 let mut exprs = all_fields
1142 .into_iter()
1143 .map(|c| DfExpr::Column(Column::from(c)))
1144 .collect::<Vec<_>>();
1145
1146 exprs.push(self.create_time_index_column_expr()?);
1148
1149 Ok(exprs)
1150 }
1151 }
1152 }
1153
1154 pub fn matchers_to_expr(
1156 label_matchers: Matchers,
1157 table_schema: &DFSchemaRef,
1158 ) -> Result<Vec<DfExpr>> {
1159 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1160 for matcher in label_matchers.matchers {
1161 let col = if table_schema
1162 .field_with_unqualified_name(&matcher.name)
1163 .is_err()
1164 {
1165 DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1166 } else {
1167 DfExpr::Column(Column::from_name(matcher.name))
1168 };
1169 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1170 let expr = match matcher.op {
1171 MatchOp::Equal => col.eq(lit),
1172 MatchOp::NotEqual => col.not_eq(lit),
1173 MatchOp::Re(re) => {
1174 if re.as_str() == ".*" {
1176 continue;
1177 }
1178 DfExpr::BinaryExpr(BinaryExpr {
1179 left: Box::new(col),
1180 op: Operator::RegexMatch,
1181 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1182 re.as_str().to_string(),
1183 )))),
1184 })
1185 }
1186 MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1187 left: Box::new(col),
1188 op: Operator::RegexNotMatch,
1189 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1190 re.as_str().to_string(),
1191 )))),
1192 }),
1193 };
1194 exprs.push(expr);
1195 }
1196
1197 Ok(exprs)
1198 }
1199
1200 fn table_ref(&self) -> Result<TableReference> {
1201 let table_name = self
1202 .ctx
1203 .table_name
1204 .clone()
1205 .context(TableNameNotFoundSnafu)?;
1206
1207 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1209 TableReference::partial(schema_name.as_str(), table_name.as_str())
1210 } else {
1211 TableReference::bare(table_name.as_str())
1212 };
1213
1214 Ok(table_ref)
1215 }
1216
1217 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1218 let start = self.ctx.start;
1219 let end = self.ctx.end;
1220 if end < start {
1221 return InvalidTimeRangeSnafu { start, end }.fail();
1222 }
1223 let lookback_delta = self.ctx.lookback_delta;
1224 let range = self.ctx.range.unwrap_or_default();
1225 let interval = self.ctx.interval;
1226 let time_index_expr = self.create_time_index_column_expr()?;
1227 let num_points = (end - start) / interval;
1228
1229 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1231 let single_time_range = time_index_expr
1232 .clone()
1233 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1234 Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1235 None,
1236 )))
1237 .and(
1238 time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1239 Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1240 None,
1241 ))),
1242 );
1243 return Ok(Some(single_time_range));
1244 }
1245
1246 let mut filters = Vec::with_capacity(num_points as usize);
1248 for timestamp in (start..end).step_by(interval as usize) {
1249 filters.push(
1250 time_index_expr
1251 .clone()
1252 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1253 Some(timestamp - offset_duration - lookback_delta - range),
1254 None,
1255 )))
1256 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1257 ScalarValue::TimestampMillisecond(
1258 Some(timestamp - offset_duration + lookback_delta),
1259 None,
1260 ),
1261 ))),
1262 )
1263 }
1264
1265 Ok(filters.into_iter().reduce(DfExpr::or))
1266 }
1267
1268 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1273 let provider = self
1274 .table_provider
1275 .resolve_table(table_ref.clone())
1276 .await
1277 .context(CatalogSnafu)?;
1278
1279 let is_time_index_ms = provider
1280 .as_any()
1281 .downcast_ref::<DefaultTableSource>()
1282 .context(UnknownTableSnafu)?
1283 .table_provider
1284 .as_any()
1285 .downcast_ref::<DfTableProviderAdapter>()
1286 .context(UnknownTableSnafu)?
1287 .table()
1288 .schema()
1289 .timestamp_column()
1290 .with_context(|| TimeIndexNotFoundSnafu {
1291 table: table_ref.to_quoted_string(),
1292 })?
1293 .data_type
1294 == ConcreteDataType::timestamp_millisecond_datatype();
1295
1296 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1297 .context(DataFusionPlanningSnafu)?
1298 .build()
1299 .context(DataFusionPlanningSnafu)?;
1300
1301 if !is_time_index_ms {
1302 let expr: Vec<_> = self
1304 .ctx
1305 .field_columns
1306 .iter()
1307 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1308 .chain(self.create_tag_column_exprs()?)
1309 .chain(Some(DfExpr::Alias(Alias {
1310 expr: Box::new(DfExpr::Cast(Cast {
1311 expr: Box::new(self.create_time_index_column_expr()?),
1312 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1313 })),
1314 relation: Some(table_ref.clone()),
1315 name: self
1316 .ctx
1317 .time_index_column
1318 .as_ref()
1319 .with_context(|| TimeIndexNotFoundSnafu {
1320 table: table_ref.to_quoted_string(),
1321 })?
1322 .clone(),
1323 })))
1324 .collect::<Vec<_>>();
1325 scan_plan = LogicalPlanBuilder::from(scan_plan)
1326 .project(expr)
1327 .context(DataFusionPlanningSnafu)?
1328 .build()
1329 .context(DataFusionPlanningSnafu)?;
1330 }
1331
1332 let result = LogicalPlanBuilder::from(scan_plan)
1333 .build()
1334 .context(DataFusionPlanningSnafu)?;
1335 Ok(result)
1336 }
1337
1338 async fn setup_context(&mut self) -> Result<()> {
1340 let table_ref = self.table_ref()?;
1341 let table = self
1342 .table_provider
1343 .resolve_table(table_ref.clone())
1344 .await
1345 .context(CatalogSnafu)?
1346 .as_any()
1347 .downcast_ref::<DefaultTableSource>()
1348 .context(UnknownTableSnafu)?
1349 .table_provider
1350 .as_any()
1351 .downcast_ref::<DfTableProviderAdapter>()
1352 .context(UnknownTableSnafu)?
1353 .table();
1354
1355 let time_index = table
1357 .schema()
1358 .timestamp_column()
1359 .with_context(|| TimeIndexNotFoundSnafu {
1360 table: table_ref.to_quoted_string(),
1361 })?
1362 .name
1363 .clone();
1364 self.ctx.time_index_column = Some(time_index);
1365
1366 let values = table
1368 .table_info()
1369 .meta
1370 .field_column_names()
1371 .cloned()
1372 .collect();
1373 self.ctx.field_columns = values;
1374
1375 let tags = table
1377 .table_info()
1378 .meta
1379 .row_key_column_names()
1380 .filter(|col| {
1381 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1383 })
1384 .cloned()
1385 .collect();
1386 self.ctx.tag_columns = tags;
1387
1388 Ok(())
1389 }
1390
1391 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1393 let mut result = FunctionArgs::default();
1394
1395 for arg in args {
1396 match *arg.clone() {
1397 PromExpr::Aggregate(_)
1398 | PromExpr::Unary(_)
1399 | PromExpr::Binary(_)
1400 | PromExpr::Paren(_)
1401 | PromExpr::Subquery(_)
1402 | PromExpr::VectorSelector(_)
1403 | PromExpr::MatrixSelector(_)
1404 | PromExpr::Extension(_)
1405 | PromExpr::Call(_) => {
1406 if result.input.replace(*arg.clone()).is_some() {
1407 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1408 }
1409 }
1410
1411 PromExpr::NumberLiteral(NumberLiteral { val, .. }) => {
1412 let scalar_value = ScalarValue::Float64(Some(val));
1413 result.literals.push(DfExpr::Literal(scalar_value));
1414 }
1415 PromExpr::StringLiteral(StringLiteral { val, .. }) => {
1416 let scalar_value = ScalarValue::Utf8(Some(val));
1417 result.literals.push(DfExpr::Literal(scalar_value));
1418 }
1419 }
1420 }
1421
1422 Ok(result)
1423 }
1424
1425 fn create_function_expr(
1429 &mut self,
1430 func: &Function,
1431 other_input_exprs: Vec<DfExpr>,
1432 session_state: &SessionState,
1433 ) -> Result<Vec<DfExpr>> {
1434 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1436
1437 let field_column_pos = 0;
1439 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1440 let scalar_func = match func.name {
1441 "increase" => ScalarFunc::ExtrapolateUdf(
1442 Arc::new(Increase::scalar_udf()),
1443 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1444 ),
1445 "rate" => ScalarFunc::ExtrapolateUdf(
1446 Arc::new(Rate::scalar_udf()),
1447 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1448 ),
1449 "delta" => ScalarFunc::ExtrapolateUdf(
1450 Arc::new(Delta::scalar_udf()),
1451 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1452 ),
1453 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1454 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1455 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1456 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1457 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1458 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1459 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1460 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1461 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1462 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1463 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1464 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1465 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1466 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1467 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1468 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1469 "predict_linear" => ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf())),
1470 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1471 "time" => {
1472 exprs.push(build_special_time_expr(
1473 self.ctx.time_index_column.as_ref().unwrap(),
1474 ));
1475 ScalarFunc::GeneratedExpr
1476 }
1477 "minute" => {
1478 let expr = self.date_part_on_time_index("minute")?;
1480 exprs.push(expr);
1481 ScalarFunc::GeneratedExpr
1482 }
1483 "hour" => {
1484 let expr = self.date_part_on_time_index("hour")?;
1486 exprs.push(expr);
1487 ScalarFunc::GeneratedExpr
1488 }
1489 "month" => {
1490 let expr = self.date_part_on_time_index("month")?;
1492 exprs.push(expr);
1493 ScalarFunc::GeneratedExpr
1494 }
1495 "year" => {
1496 let expr = self.date_part_on_time_index("year")?;
1498 exprs.push(expr);
1499 ScalarFunc::GeneratedExpr
1500 }
1501 "day_of_month" => {
1502 let expr = self.date_part_on_time_index("day")?;
1504 exprs.push(expr);
1505 ScalarFunc::GeneratedExpr
1506 }
1507 "day_of_week" => {
1508 let expr = self.date_part_on_time_index("dow")?;
1510 exprs.push(expr);
1511 ScalarFunc::GeneratedExpr
1512 }
1513 "day_of_year" => {
1514 let expr = self.date_part_on_time_index("doy")?;
1516 exprs.push(expr);
1517 ScalarFunc::GeneratedExpr
1518 }
1519 "days_in_month" => {
1520 let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1525 let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1526 let interval_1month_lit_expr =
1527 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1528 let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1529 IntervalDayTime::new(1, 0),
1530 )));
1531 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1532 left: Box::new(interval_1month_lit_expr),
1533 op: Operator::Minus,
1534 right: Box::new(interval_1day_lit_expr),
1535 });
1536 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1537 func: datafusion_functions::datetime::date_trunc(),
1538 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1539 });
1540 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1541 left: Box::new(date_trunc_expr),
1542 op: Operator::Plus,
1543 right: Box::new(the_1month_minus_1day_expr),
1544 });
1545 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1546 func: datafusion_functions::datetime::date_part(),
1547 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1548 });
1549
1550 exprs.push(date_part_expr);
1551 ScalarFunc::GeneratedExpr
1552 }
1553
1554 "label_join" => {
1555 let (concat_expr, dst_label) =
1556 Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1557
1558 for value in &self.ctx.field_columns {
1560 if *value != dst_label {
1561 let expr = DfExpr::Column(Column::from_name(value));
1562 exprs.push(expr);
1563 }
1564 }
1565
1566 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1568
1569 exprs.push(concat_expr);
1571
1572 ScalarFunc::GeneratedExpr
1573 }
1574 "label_replace" => {
1575 let (replace_expr, dst_label) =
1576 Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
1577
1578 for value in &self.ctx.field_columns {
1580 if *value != dst_label {
1581 let expr = DfExpr::Column(Column::from_name(value));
1582 exprs.push(expr);
1583 }
1584 }
1585
1586 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1588
1589 exprs.push(replace_expr);
1591
1592 ScalarFunc::GeneratedExpr
1593 }
1594 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1595 for value in &self.ctx.field_columns {
1598 let expr = DfExpr::Column(Column::from_name(value));
1599 exprs.push(expr);
1600 }
1601
1602 ScalarFunc::GeneratedExpr
1603 }
1604 "round" => {
1605 if other_input_exprs.is_empty() {
1606 other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1607 }
1608 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1609 }
1610
1611 _ => {
1612 if let Some(f) = session_state.scalar_functions().get(func.name) {
1613 ScalarFunc::DataFusionBuiltin(f.clone())
1614 } else if let Some(f) = datafusion_functions::math::functions()
1615 .iter()
1616 .find(|f| f.name() == func.name)
1617 {
1618 ScalarFunc::DataFusionUdf(f.clone())
1619 } else {
1620 return UnsupportedExprSnafu {
1621 name: func.name.to_string(),
1622 }
1623 .fail();
1624 }
1625 }
1626 };
1627
1628 for value in &self.ctx.field_columns {
1629 let col_expr = DfExpr::Column(Column::from_name(value));
1630
1631 match scalar_func.clone() {
1632 ScalarFunc::DataFusionBuiltin(func) => {
1633 other_input_exprs.insert(field_column_pos, col_expr);
1634 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1635 func,
1636 args: other_input_exprs.clone().into(),
1637 });
1638 exprs.push(fn_expr);
1639 let _ = other_input_exprs.remove(field_column_pos);
1640 }
1641 ScalarFunc::DataFusionUdf(func) => {
1642 let args = itertools::chain!(
1643 other_input_exprs.iter().take(field_column_pos).cloned(),
1644 std::iter::once(col_expr),
1645 other_input_exprs.iter().skip(field_column_pos).cloned()
1646 )
1647 .collect_vec();
1648 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1649 }
1650 ScalarFunc::Udf(func) => {
1651 let ts_range_expr = DfExpr::Column(Column::from_name(
1652 RangeManipulate::build_timestamp_range_name(
1653 self.ctx.time_index_column.as_ref().unwrap(),
1654 ),
1655 ));
1656 other_input_exprs.insert(field_column_pos, ts_range_expr);
1657 other_input_exprs.insert(field_column_pos + 1, col_expr);
1658 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1659 func,
1660 args: other_input_exprs.clone().into(),
1661 });
1662 exprs.push(fn_expr);
1663 let _ = other_input_exprs.remove(field_column_pos + 1);
1664 let _ = other_input_exprs.remove(field_column_pos);
1665 }
1666 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1667 let ts_range_expr = DfExpr::Column(Column::from_name(
1668 RangeManipulate::build_timestamp_range_name(
1669 self.ctx.time_index_column.as_ref().unwrap(),
1670 ),
1671 ));
1672 other_input_exprs.insert(field_column_pos, ts_range_expr);
1673 other_input_exprs.insert(field_column_pos + 1, col_expr);
1674 other_input_exprs
1675 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1676 other_input_exprs.push_back(lit(range_length));
1677 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1678 func,
1679 args: other_input_exprs.clone().into(),
1680 });
1681 exprs.push(fn_expr);
1682 let _ = other_input_exprs.pop_back();
1683 let _ = other_input_exprs.remove(field_column_pos + 2);
1684 let _ = other_input_exprs.remove(field_column_pos + 1);
1685 let _ = other_input_exprs.remove(field_column_pos);
1686 }
1687 ScalarFunc::GeneratedExpr => {}
1688 }
1689 }
1690
1691 let mut new_field_columns = Vec::with_capacity(exprs.len());
1693
1694 exprs = exprs
1695 .into_iter()
1696 .map(|expr| {
1697 let display_name = expr.schema_name().to_string();
1698 new_field_columns.push(display_name.clone());
1699 Ok(expr.alias(display_name))
1700 })
1701 .collect::<std::result::Result<Vec<_>, _>>()
1702 .context(DataFusionPlanningSnafu)?;
1703
1704 self.ctx.field_columns = new_field_columns;
1705
1706 Ok(exprs)
1707 }
1708
1709 fn build_regexp_replace_label_expr(
1711 other_input_exprs: &mut VecDeque<DfExpr>,
1712 session_state: &SessionState,
1713 ) -> Result<(DfExpr, String)> {
1714 let dst_label = match other_input_exprs.pop_front() {
1716 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1717 other => UnexpectedPlanExprSnafu {
1718 desc: format!("expected dst_label string literal, but found {:?}", other),
1719 }
1720 .fail()?,
1721 };
1722 let replacement = match other_input_exprs.pop_front() {
1723 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1724 other => UnexpectedPlanExprSnafu {
1725 desc: format!("expected replacement string literal, but found {:?}", other),
1726 }
1727 .fail()?,
1728 };
1729 let src_label = match other_input_exprs.pop_front() {
1730 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1731 other => UnexpectedPlanExprSnafu {
1732 desc: format!("expected src_label string literal, but found {:?}", other),
1733 }
1734 .fail()?,
1735 };
1736 let regex = match other_input_exprs.pop_front() {
1737 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1738 other => UnexpectedPlanExprSnafu {
1739 desc: format!("expected regex string literal, but found {:?}", other),
1740 }
1741 .fail()?,
1742 };
1743
1744 let func = session_state
1745 .scalar_functions()
1746 .get("regexp_replace")
1747 .context(UnsupportedExprSnafu {
1748 name: "regexp_replace",
1749 })?;
1750
1751 let args = vec![
1753 if src_label.is_empty() {
1754 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1755 } else {
1756 DfExpr::Column(Column::from_name(src_label))
1757 },
1758 DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1759 DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1760 ];
1761
1762 Ok((
1763 DfExpr::ScalarFunction(ScalarFunction {
1764 func: func.clone(),
1765 args,
1766 })
1767 .alias(&dst_label),
1768 dst_label,
1769 ))
1770 }
1771
1772 fn build_concat_labels_expr(
1774 other_input_exprs: &mut VecDeque<DfExpr>,
1775 session_state: &SessionState,
1776 ) -> Result<(DfExpr, String)> {
1777 let dst_label = match other_input_exprs.pop_front() {
1780 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1781 other => UnexpectedPlanExprSnafu {
1782 desc: format!("expected dst_label string literal, but found {:?}", other),
1783 }
1784 .fail()?,
1785 };
1786 let separator = match other_input_exprs.pop_front() {
1787 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1788 other => UnexpectedPlanExprSnafu {
1789 desc: format!("expected separator string literal, but found {:?}", other),
1790 }
1791 .fail()?,
1792 };
1793 let src_labels = other_input_exprs
1794 .clone()
1795 .into_iter()
1796 .map(|expr| {
1797 match expr {
1799 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1800 if label.is_empty() {
1801 Ok(DfExpr::Literal(ScalarValue::Null))
1802 } else {
1803 Ok(DfExpr::Column(Column::from_name(label)))
1804 }
1805 }
1806 other => UnexpectedPlanExprSnafu {
1807 desc: format!(
1808 "expected source label string literal, but found {:?}",
1809 other
1810 ),
1811 }
1812 .fail(),
1813 }
1814 })
1815 .collect::<Result<Vec<_>>>()?;
1816 ensure!(
1817 !src_labels.is_empty(),
1818 FunctionInvalidArgumentSnafu {
1819 fn_name: "label_join"
1820 }
1821 );
1822
1823 let func = session_state
1824 .scalar_functions()
1825 .get("concat_ws")
1826 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1827
1828 let mut args = Vec::with_capacity(1 + src_labels.len());
1830 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1831 args.extend(src_labels);
1832
1833 Ok((
1834 DfExpr::ScalarFunction(ScalarFunction {
1835 func: func.clone(),
1836 args,
1837 })
1838 .alias(&dst_label),
1839 dst_label,
1840 ))
1841 }
1842
1843 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1844 Ok(DfExpr::Column(Column::from_name(
1845 self.ctx
1846 .time_index_column
1847 .clone()
1848 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1849 )))
1850 }
1851
1852 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1853 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1854 for tag in &self.ctx.tag_columns {
1855 let expr = DfExpr::Column(Column::from_name(tag));
1856 result.push(expr);
1857 }
1858 Ok(result)
1859 }
1860
1861 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1862 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1863 for field in &self.ctx.field_columns {
1864 let expr = DfExpr::Column(Column::from_name(field));
1865 result.push(expr);
1866 }
1867 Ok(result)
1868 }
1869
1870 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1871 let mut result = self
1872 .ctx
1873 .tag_columns
1874 .iter()
1875 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1876 .collect::<Vec<_>>();
1877 result.push(self.create_time_index_column_expr()?.sort(true, true));
1878 Ok(result)
1879 }
1880
1881 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1882 self.ctx
1883 .field_columns
1884 .iter()
1885 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1886 .collect::<Vec<_>>()
1887 }
1888
1889 fn create_sort_exprs_by_tags(
1890 func: &str,
1891 tags: Vec<DfExpr>,
1892 asc: bool,
1893 ) -> Result<Vec<SortExpr>> {
1894 ensure!(
1895 !tags.is_empty(),
1896 FunctionInvalidArgumentSnafu { fn_name: func }
1897 );
1898
1899 tags.iter()
1900 .map(|col| match col {
1901 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1902 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1903 }
1904 other => UnexpectedPlanExprSnafu {
1905 desc: format!("expected label string literal, but found {:?}", other),
1906 }
1907 .fail(),
1908 })
1909 .collect::<Result<Vec<_>>>()
1910 }
1911
1912 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
1913 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1914 for value in &self.ctx.field_columns {
1915 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
1916 exprs.push(expr);
1917 }
1918
1919 conjunction(exprs).context(ValueNotFoundSnafu {
1920 table: self.table_ref()?.to_quoted_string(),
1921 })
1922 }
1923
1924 fn create_aggregate_exprs(
1940 &mut self,
1941 op: TokenType,
1942 param: &Option<Box<PromExpr>>,
1943 input_plan: &LogicalPlan,
1944 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
1945 let mut non_col_args = Vec::new();
1946 let aggr = match op.id() {
1947 token::T_SUM => sum_udaf(),
1948 token::T_QUANTILE => {
1949 let q = Self::get_param_value_as_f64(op, param)?;
1950 non_col_args.push(lit(q));
1951 quantile_udaf()
1952 }
1953 token::T_AVG => avg_udaf(),
1954 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
1955 token::T_MIN => min_udaf(),
1956 token::T_MAX => max_udaf(),
1957 token::T_GROUP => grouping_udaf(),
1958 token::T_STDDEV => stddev_pop_udaf(),
1959 token::T_STDVAR => var_pop_udaf(),
1960 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
1961 name: format!("{op:?}"),
1962 }
1963 .fail()?,
1964 _ => UnexpectedTokenSnafu { token: op }.fail()?,
1965 };
1966
1967 let exprs: Vec<DfExpr> = self
1969 .ctx
1970 .field_columns
1971 .iter()
1972 .map(|col| {
1973 non_col_args.push(DfExpr::Column(Column::from_name(col)));
1974 let expr = aggr.call(non_col_args.clone());
1975 non_col_args.pop();
1976 expr
1977 })
1978 .collect::<Vec<_>>();
1979
1980 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
1982 let prev_field_exprs: Vec<_> = self
1983 .ctx
1984 .field_columns
1985 .iter()
1986 .map(|col| DfExpr::Column(Column::from_name(col)))
1987 .collect();
1988
1989 ensure!(
1990 self.ctx.field_columns.len() == 1,
1991 UnsupportedExprSnafu {
1992 name: "count_values on multi-value input"
1993 }
1994 );
1995
1996 prev_field_exprs
1997 } else {
1998 vec![]
1999 };
2000
2001 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2003
2004 let normalized_exprs =
2005 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2006 for expr in normalized_exprs {
2007 new_field_columns.push(expr.schema_name().to_string());
2008 }
2009 self.ctx.field_columns = new_field_columns;
2010
2011 Ok((exprs, prev_field_exprs))
2012 }
2013
2014 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2015 let param = param
2016 .as_deref()
2017 .with_context(|| FunctionInvalidArgumentSnafu {
2018 fn_name: op.to_string(),
2019 })?;
2020 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2021 return FunctionInvalidArgumentSnafu {
2022 fn_name: op.to_string(),
2023 }
2024 .fail();
2025 };
2026
2027 Ok(val)
2028 }
2029
2030 fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
2031 let param = param
2032 .as_deref()
2033 .with_context(|| FunctionInvalidArgumentSnafu {
2034 fn_name: op.to_string(),
2035 })?;
2036 let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
2037 return FunctionInvalidArgumentSnafu {
2038 fn_name: op.to_string(),
2039 }
2040 .fail();
2041 };
2042
2043 Ok(*val)
2044 }
2045
2046 fn create_window_exprs(
2049 &mut self,
2050 op: TokenType,
2051 group_exprs: Vec<DfExpr>,
2052 input_plan: &LogicalPlan,
2053 ) -> Result<Vec<DfExpr>> {
2054 ensure!(
2055 self.ctx.field_columns.len() == 1,
2056 UnsupportedExprSnafu {
2057 name: "topk or bottomk on multi-value input"
2058 }
2059 );
2060
2061 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2062
2063 let asc = matches!(op.id(), token::T_BOTTOMK);
2064
2065 let tag_sort_exprs = self
2066 .create_tag_column_exprs()?
2067 .into_iter()
2068 .map(|expr| expr.sort(asc, true));
2069
2070 let exprs: Vec<DfExpr> = self
2072 .ctx
2073 .field_columns
2074 .iter()
2075 .map(|col| {
2076 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2077 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2079 sort_exprs.extend(tag_sort_exprs.clone());
2082
2083 DfExpr::WindowFunction(WindowFunction {
2084 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2085 args: vec![],
2086 partition_by: group_exprs.clone(),
2087 order_by: sort_exprs,
2088 window_frame: WindowFrame::new(Some(true)),
2089 null_treatment: None,
2090 })
2091 })
2092 .collect();
2093
2094 let normalized_exprs =
2095 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2096 Ok(normalized_exprs)
2097 }
2098
2099 async fn create_histogram_plan(
2101 &mut self,
2102 args: &PromFunctionArgs,
2103 session_state: &SessionState,
2104 ) -> Result<LogicalPlan> {
2105 if args.args.len() != 2 {
2106 return FunctionInvalidArgumentSnafu {
2107 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2108 }
2109 .fail();
2110 }
2111 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2112 FunctionInvalidArgumentSnafu {
2113 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2114 }
2115 })?;
2116 let input = args.args[1].as_ref().clone();
2117 let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2118
2119 if !self.ctx.has_le_tag() {
2120 return ColumnNotFoundSnafu {
2121 col: LE_COLUMN_NAME.to_string(),
2122 }
2123 .fail();
2124 }
2125 let time_index_column =
2126 self.ctx
2127 .time_index_column
2128 .clone()
2129 .with_context(|| TimeIndexNotFoundSnafu {
2130 table: self.ctx.table_name.clone().unwrap_or_default(),
2131 })?;
2132 let field_column = self
2134 .ctx
2135 .field_columns
2136 .first()
2137 .with_context(|| FunctionInvalidArgumentSnafu {
2138 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2139 })?
2140 .clone();
2141 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2143
2144 Ok(LogicalPlan::Extension(Extension {
2145 node: Arc::new(
2146 HistogramFold::new(
2147 LE_COLUMN_NAME.to_string(),
2148 field_column,
2149 time_index_column,
2150 phi,
2151 input_plan,
2152 )
2153 .context(DataFusionPlanningSnafu)?,
2154 ),
2155 }))
2156 }
2157
2158 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2160 if args.args.len() != 1 {
2161 return FunctionInvalidArgumentSnafu {
2162 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2163 }
2164 .fail();
2165 }
2166 let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2167 FunctionInvalidArgumentSnafu {
2168 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2169 }
2170 })?;
2171
2172 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2174 self.ctx.reset_table_name_and_schema();
2175 self.ctx.tag_columns = vec![];
2176 self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2177 Ok(LogicalPlan::Extension(Extension {
2178 node: Arc::new(
2179 EmptyMetric::new(
2180 self.ctx.start,
2181 self.ctx.end,
2182 self.ctx.interval,
2183 SPECIAL_TIME_FUNCTION.to_string(),
2184 GREPTIME_VALUE.to_string(),
2185 Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
2186 )
2187 .context(DataFusionPlanningSnafu)?,
2188 ),
2189 }))
2190 }
2191
2192 async fn create_scalar_plan(
2194 &mut self,
2195 args: &PromFunctionArgs,
2196 session_state: &SessionState,
2197 ) -> Result<LogicalPlan> {
2198 ensure!(
2199 args.len() == 1,
2200 FunctionInvalidArgumentSnafu {
2201 fn_name: SCALAR_FUNCTION
2202 }
2203 );
2204 let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2205 ensure!(
2206 self.ctx.field_columns.len() == 1,
2207 MultiFieldsNotSupportedSnafu {
2208 operator: SCALAR_FUNCTION
2209 },
2210 );
2211 let scalar_plan = LogicalPlan::Extension(Extension {
2212 node: Arc::new(
2213 ScalarCalculate::new(
2214 self.ctx.start,
2215 self.ctx.end,
2216 self.ctx.interval,
2217 input,
2218 self.ctx.time_index_column.as_ref().unwrap(),
2219 &self.ctx.tag_columns,
2220 &self.ctx.field_columns[0],
2221 self.ctx.table_name.as_deref(),
2222 )
2223 .context(PromqlPlanNodeSnafu)?,
2224 ),
2225 });
2226 self.ctx.tag_columns.clear();
2228 self.ctx.field_columns.clear();
2229 self.ctx
2230 .field_columns
2231 .push(scalar_plan.schema().field(1).name().clone());
2232 Ok(scalar_plan)
2233 }
2234
2235 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2238 match expr {
2239 PromExpr::NumberLiteral(NumberLiteral { val }) => {
2240 let scalar_value = ScalarValue::Float64(Some(*val));
2241 Some(DfExpr::Literal(scalar_value))
2242 }
2243 PromExpr::StringLiteral(StringLiteral { val }) => {
2244 let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2245 Some(DfExpr::Literal(scalar_value))
2246 }
2247 PromExpr::VectorSelector(_)
2248 | PromExpr::MatrixSelector(_)
2249 | PromExpr::Extension(_)
2250 | PromExpr::Aggregate(_)
2251 | PromExpr::Subquery(_) => None,
2252 PromExpr::Call(Call { func, .. }) => {
2253 if func.name == SPECIAL_TIME_FUNCTION {
2254 Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2255 } else {
2256 None
2257 }
2258 }
2259 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2260 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2262 PromExpr::Binary(PromBinaryExpr {
2263 lhs,
2264 rhs,
2265 op,
2266 modifier,
2267 }) => {
2268 let lhs = Self::try_build_literal_expr(lhs)?;
2269 let rhs = Self::try_build_literal_expr(rhs)?;
2270 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2271 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2272 let expr = expr_builder(lhs, rhs).ok()?;
2273
2274 let should_return_bool = if let Some(m) = modifier {
2275 m.return_bool
2276 } else {
2277 false
2278 };
2279 if is_comparison_op && should_return_bool {
2280 Some(DfExpr::Cast(Cast {
2281 expr: Box::new(expr),
2282 data_type: ArrowDataType::Float64,
2283 }))
2284 } else {
2285 Some(expr)
2286 }
2287 }
2288 }
2289 }
2290
2291 fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2292 match expr {
2293 PromExpr::Call(Call { func, .. }) => {
2294 if func.name == SPECIAL_TIME_FUNCTION {
2295 Some(build_special_time_expr(time_index_col))
2296 } else {
2297 None
2298 }
2299 }
2300 _ => None,
2301 }
2302 }
2303
2304 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2306 match expr {
2307 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2308 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2309 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2310 Self::try_build_float_literal(expr).map(|f| -f)
2311 }
2312 PromExpr::StringLiteral(_)
2313 | PromExpr::Binary(_)
2314 | PromExpr::VectorSelector(_)
2315 | PromExpr::MatrixSelector(_)
2316 | PromExpr::Call(_)
2317 | PromExpr::Extension(_)
2318 | PromExpr::Aggregate(_)
2319 | PromExpr::Subquery(_) => None,
2320 }
2321 }
2322
2323 #[allow(clippy::type_complexity)]
2326 fn prom_token_to_binary_expr_builder(
2327 token: TokenType,
2328 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2329 match token.id() {
2330 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2331 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2332 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2333 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2334 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2335 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2336 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2337 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2338 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2339 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2340 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2341 token::T_POW => Ok(Box::new(|lhs, rhs| {
2342 Ok(DfExpr::ScalarFunction(ScalarFunction {
2343 func: datafusion_functions::math::power(),
2344 args: vec![lhs, rhs],
2345 }))
2346 })),
2347 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2348 Ok(DfExpr::ScalarFunction(ScalarFunction {
2349 func: datafusion_functions::math::atan2(),
2350 args: vec![lhs, rhs],
2351 }))
2352 })),
2353 _ => UnexpectedTokenSnafu { token }.fail(),
2354 }
2355 }
2356
2357 fn is_token_a_comparison_op(token: TokenType) -> bool {
2359 matches!(
2360 token.id(),
2361 token::T_EQLC
2362 | token::T_NEQ
2363 | token::T_GTR
2364 | token::T_LSS
2365 | token::T_GTE
2366 | token::T_LTE
2367 )
2368 }
2369
2370 fn is_token_a_set_op(token: TokenType) -> bool {
2372 matches!(
2373 token.id(),
2374 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2378 }
2379
2380 #[allow(clippy::too_many_arguments)]
2383 fn join_on_non_field_columns(
2384 &self,
2385 left: LogicalPlan,
2386 right: LogicalPlan,
2387 left_table_ref: TableReference,
2388 right_table_ref: TableReference,
2389 left_time_index_column: Option<String>,
2390 right_time_index_column: Option<String>,
2391 only_join_time_index: bool,
2392 modifier: &Option<BinModifier>,
2393 ) -> Result<LogicalPlan> {
2394 let mut left_tag_columns = if only_join_time_index {
2395 BTreeSet::new()
2396 } else {
2397 self.ctx
2398 .tag_columns
2399 .iter()
2400 .cloned()
2401 .collect::<BTreeSet<_>>()
2402 };
2403 let mut right_tag_columns = left_tag_columns.clone();
2404
2405 if let Some(modifier) = modifier {
2407 if let Some(matching) = &modifier.matching {
2409 match matching {
2410 LabelModifier::Include(on) => {
2412 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2413 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2414 right_tag_columns =
2415 right_tag_columns.intersection(&mask).cloned().collect();
2416 }
2417 LabelModifier::Exclude(ignoring) => {
2419 for label in &ignoring.labels {
2421 let _ = left_tag_columns.remove(label);
2422 let _ = right_tag_columns.remove(label);
2423 }
2424 }
2425 }
2426 }
2427 }
2428
2429 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2431 (left_time_index_column, right_time_index_column)
2432 {
2433 left_tag_columns.insert(left_time_index_column);
2434 right_tag_columns.insert(right_time_index_column);
2435 }
2436
2437 let right = LogicalPlanBuilder::from(right)
2438 .alias(right_table_ref)
2439 .context(DataFusionPlanningSnafu)?
2440 .build()
2441 .context(DataFusionPlanningSnafu)?;
2442
2443 LogicalPlanBuilder::from(left)
2445 .alias(left_table_ref)
2446 .context(DataFusionPlanningSnafu)?
2447 .join(
2448 right,
2449 JoinType::Inner,
2450 (
2451 left_tag_columns
2452 .into_iter()
2453 .map(Column::from_name)
2454 .collect::<Vec<_>>(),
2455 right_tag_columns
2456 .into_iter()
2457 .map(Column::from_name)
2458 .collect::<Vec<_>>(),
2459 ),
2460 None,
2461 )
2462 .context(DataFusionPlanningSnafu)?
2463 .build()
2464 .context(DataFusionPlanningSnafu)
2465 }
2466
2467 fn set_op_on_non_field_columns(
2469 &mut self,
2470 left: LogicalPlan,
2471 mut right: LogicalPlan,
2472 left_context: PromPlannerContext,
2473 right_context: PromPlannerContext,
2474 op: TokenType,
2475 modifier: &Option<BinModifier>,
2476 ) -> Result<LogicalPlan> {
2477 let mut left_tag_col_set = left_context
2478 .tag_columns
2479 .iter()
2480 .cloned()
2481 .collect::<HashSet<_>>();
2482 let mut right_tag_col_set = right_context
2483 .tag_columns
2484 .iter()
2485 .cloned()
2486 .collect::<HashSet<_>>();
2487
2488 if matches!(op.id(), token::T_LOR) {
2489 return self.or_operator(
2490 left,
2491 right,
2492 left_tag_col_set,
2493 right_tag_col_set,
2494 left_context,
2495 right_context,
2496 modifier,
2497 );
2498 }
2499
2500 if let Some(modifier) = modifier {
2502 ensure!(
2504 matches!(
2505 modifier.card,
2506 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2507 ),
2508 UnsupportedVectorMatchSnafu {
2509 name: modifier.card.clone(),
2510 },
2511 );
2512 if let Some(matching) = &modifier.matching {
2514 match matching {
2515 LabelModifier::Include(on) => {
2517 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2518 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2519 right_tag_col_set =
2520 right_tag_col_set.intersection(&mask).cloned().collect();
2521 }
2522 LabelModifier::Exclude(ignoring) => {
2524 for label in &ignoring.labels {
2526 let _ = left_tag_col_set.remove(label);
2527 let _ = right_tag_col_set.remove(label);
2528 }
2529 }
2530 }
2531 }
2532 }
2533 if !matches!(op.id(), token::T_LOR) {
2535 ensure!(
2536 left_tag_col_set == right_tag_col_set,
2537 CombineTableColumnMismatchSnafu {
2538 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2539 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2540 }
2541 )
2542 };
2543 let left_time_index = left_context.time_index_column.clone().unwrap();
2544 let right_time_index = right_context.time_index_column.clone().unwrap();
2545 let join_keys = left_tag_col_set
2546 .iter()
2547 .cloned()
2548 .chain([left_time_index.clone()])
2549 .collect::<Vec<_>>();
2550 self.ctx.time_index_column = Some(left_time_index.clone());
2551
2552 if left_context.time_index_column != right_context.time_index_column {
2554 let right_project_exprs = right
2555 .schema()
2556 .fields()
2557 .iter()
2558 .map(|field| {
2559 if field.name() == &right_time_index {
2560 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2561 } else {
2562 DfExpr::Column(Column::from_name(field.name()))
2563 }
2564 })
2565 .collect::<Vec<_>>();
2566
2567 right = LogicalPlanBuilder::from(right)
2568 .project(right_project_exprs)
2569 .context(DataFusionPlanningSnafu)?
2570 .build()
2571 .context(DataFusionPlanningSnafu)?;
2572 }
2573
2574 ensure!(
2575 left_context.field_columns.len() == 1,
2576 MultiFieldsNotSupportedSnafu {
2577 operator: "AND operator"
2578 }
2579 );
2580 let left_field_col = left_context.field_columns.first().unwrap();
2583 self.ctx.field_columns = vec![left_field_col.clone()];
2584
2585 match op.id() {
2588 token::T_LAND => LogicalPlanBuilder::from(left)
2589 .distinct()
2590 .context(DataFusionPlanningSnafu)?
2591 .join_detailed(
2592 right,
2593 JoinType::LeftSemi,
2594 (join_keys.clone(), join_keys),
2595 None,
2596 true,
2597 )
2598 .context(DataFusionPlanningSnafu)?
2599 .build()
2600 .context(DataFusionPlanningSnafu),
2601 token::T_LUNLESS => LogicalPlanBuilder::from(left)
2602 .distinct()
2603 .context(DataFusionPlanningSnafu)?
2604 .join_detailed(
2605 right,
2606 JoinType::LeftAnti,
2607 (join_keys.clone(), join_keys),
2608 None,
2609 true,
2610 )
2611 .context(DataFusionPlanningSnafu)?
2612 .build()
2613 .context(DataFusionPlanningSnafu),
2614 token::T_LOR => {
2615 unreachable!()
2618 }
2619 _ => UnexpectedTokenSnafu { token: op }.fail(),
2620 }
2621 }
2622
2623 #[allow(clippy::too_many_arguments)]
2625 fn or_operator(
2626 &mut self,
2627 left: LogicalPlan,
2628 right: LogicalPlan,
2629 left_tag_cols_set: HashSet<String>,
2630 right_tag_cols_set: HashSet<String>,
2631 left_context: PromPlannerContext,
2632 right_context: PromPlannerContext,
2633 modifier: &Option<BinModifier>,
2634 ) -> Result<LogicalPlan> {
2635 ensure!(
2637 left_context.field_columns.len() == right_context.field_columns.len(),
2638 CombineTableColumnMismatchSnafu {
2639 left: left_context.field_columns.clone(),
2640 right: right_context.field_columns.clone()
2641 }
2642 );
2643 ensure!(
2644 left_context.field_columns.len() == 1,
2645 MultiFieldsNotSupportedSnafu {
2646 operator: "OR operator"
2647 }
2648 );
2649
2650 let all_tags = left_tag_cols_set
2652 .union(&right_tag_cols_set)
2653 .cloned()
2654 .collect::<HashSet<_>>();
2655 let tags_not_in_left = all_tags
2656 .difference(&left_tag_cols_set)
2657 .cloned()
2658 .collect::<Vec<_>>();
2659 let tags_not_in_right = all_tags
2660 .difference(&right_tag_cols_set)
2661 .cloned()
2662 .collect::<Vec<_>>();
2663 let left_qualifier = left.schema().qualified_field(0).0.cloned();
2664 let right_qualifier = right.schema().qualified_field(0).0.cloned();
2665 let left_qualifier_string = left_qualifier
2666 .as_ref()
2667 .map(|l| l.to_string())
2668 .unwrap_or_default();
2669 let right_qualifier_string = right_qualifier
2670 .as_ref()
2671 .map(|r| r.to_string())
2672 .unwrap_or_default();
2673 let left_time_index_column =
2674 left_context
2675 .time_index_column
2676 .clone()
2677 .with_context(|| TimeIndexNotFoundSnafu {
2678 table: left_qualifier_string.clone(),
2679 })?;
2680 let right_time_index_column =
2681 right_context
2682 .time_index_column
2683 .clone()
2684 .with_context(|| TimeIndexNotFoundSnafu {
2685 table: right_qualifier_string.clone(),
2686 })?;
2687 let left_field_col = left_context.field_columns.first().unwrap();
2689 let right_field_col = right_context.field_columns.first().unwrap();
2690
2691 let mut all_columns_set = left
2693 .schema()
2694 .fields()
2695 .iter()
2696 .chain(right.schema().fields().iter())
2697 .map(|field| field.name().clone())
2698 .collect::<HashSet<_>>();
2699 all_columns_set.remove(&left_time_index_column);
2701 all_columns_set.remove(&right_time_index_column);
2702 if left_field_col != right_field_col {
2704 all_columns_set.remove(right_field_col);
2705 }
2706 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2707 all_columns.sort_unstable();
2709 all_columns.insert(0, left_time_index_column.clone());
2711
2712 let left_proj_exprs = all_columns.iter().map(|col| {
2714 if tags_not_in_left.contains(col) {
2715 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2716 } else {
2717 DfExpr::Column(Column::new(None::<String>, col))
2718 }
2719 });
2720 let right_time_index_expr = DfExpr::Column(Column::new(
2721 right_qualifier.clone(),
2722 right_time_index_column,
2723 ))
2724 .alias(left_time_index_column.clone());
2725 let right_qualifier_for_field = right
2728 .schema()
2729 .iter()
2730 .find(|(_, f)| f.name() == right_field_col)
2731 .map(|(q, _)| q)
2732 .context(ColumnNotFoundSnafu {
2733 col: right_field_col.to_string(),
2734 })?
2735 .cloned();
2736 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2738 if col == left_field_col && left_field_col != right_field_col {
2740 DfExpr::Column(Column::new(
2742 right_qualifier_for_field.clone(),
2743 right_field_col,
2744 ))
2745 } else if tags_not_in_right.contains(col) {
2746 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2747 } else {
2748 DfExpr::Column(Column::new(None::<String>, col))
2749 }
2750 });
2751 let right_proj_exprs = [right_time_index_expr]
2752 .into_iter()
2753 .chain(right_proj_exprs_without_time_index);
2754
2755 let left_projected = LogicalPlanBuilder::from(left)
2756 .project(left_proj_exprs)
2757 .context(DataFusionPlanningSnafu)?
2758 .alias(left_qualifier_string.clone())
2759 .context(DataFusionPlanningSnafu)?
2760 .build()
2761 .context(DataFusionPlanningSnafu)?;
2762 let right_projected = LogicalPlanBuilder::from(right)
2763 .project(right_proj_exprs)
2764 .context(DataFusionPlanningSnafu)?
2765 .alias(right_qualifier_string.clone())
2766 .context(DataFusionPlanningSnafu)?
2767 .build()
2768 .context(DataFusionPlanningSnafu)?;
2769
2770 let mut match_columns = if let Some(modifier) = modifier
2772 && let Some(matching) = &modifier.matching
2773 {
2774 match matching {
2775 LabelModifier::Include(on) => on.labels.clone(),
2777 LabelModifier::Exclude(ignoring) => {
2779 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2780 all_tags.difference(&ignoring).cloned().collect()
2781 }
2782 }
2783 } else {
2784 all_tags.iter().cloned().collect()
2785 };
2786 match_columns.sort_unstable();
2788 let schema = left_projected.schema().clone();
2790 let union_distinct_on = UnionDistinctOn::new(
2791 left_projected,
2792 right_projected,
2793 match_columns,
2794 left_time_index_column.clone(),
2795 schema,
2796 );
2797 let result = LogicalPlan::Extension(Extension {
2798 node: Arc::new(union_distinct_on),
2799 });
2800
2801 self.ctx.time_index_column = Some(left_time_index_column);
2803 self.ctx.tag_columns = all_tags.into_iter().collect();
2804 self.ctx.field_columns = vec![left_field_col.to_string()];
2805
2806 Ok(result)
2807 }
2808
2809 fn projection_for_each_field_column<F>(
2817 &mut self,
2818 input: LogicalPlan,
2819 name_to_expr: F,
2820 ) -> Result<LogicalPlan>
2821 where
2822 F: FnMut(&String) -> Result<DfExpr>,
2823 {
2824 let non_field_columns_iter = self
2825 .ctx
2826 .tag_columns
2827 .iter()
2828 .chain(self.ctx.time_index_column.iter())
2829 .map(|col| {
2830 Ok(DfExpr::Column(Column::new(
2831 self.ctx.table_name.clone().map(TableReference::bare),
2832 col,
2833 )))
2834 });
2835
2836 let result_field_columns = self
2838 .ctx
2839 .field_columns
2840 .iter()
2841 .map(name_to_expr)
2842 .collect::<Result<Vec<_>>>()?;
2843
2844 self.ctx.field_columns = result_field_columns
2846 .iter()
2847 .map(|expr| expr.schema_name().to_string())
2848 .collect();
2849 let field_columns_iter = result_field_columns
2850 .into_iter()
2851 .zip(self.ctx.field_columns.iter())
2852 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2853
2854 let project_fields = non_field_columns_iter
2856 .chain(field_columns_iter)
2857 .collect::<Result<Vec<_>>>()?;
2858
2859 LogicalPlanBuilder::from(input)
2860 .project(project_fields)
2861 .context(DataFusionPlanningSnafu)?
2862 .build()
2863 .context(DataFusionPlanningSnafu)
2864 }
2865
2866 fn filter_on_field_column<F>(
2869 &self,
2870 input: LogicalPlan,
2871 mut name_to_expr: F,
2872 ) -> Result<LogicalPlan>
2873 where
2874 F: FnMut(&String) -> Result<DfExpr>,
2875 {
2876 ensure!(
2877 self.ctx.field_columns.len() == 1,
2878 UnsupportedExprSnafu {
2879 name: "filter on multi-value input"
2880 }
2881 );
2882
2883 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
2884
2885 LogicalPlanBuilder::from(input)
2886 .filter(field_column_filter)
2887 .context(DataFusionPlanningSnafu)?
2888 .build()
2889 .context(DataFusionPlanningSnafu)
2890 }
2891
2892 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
2895 let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
2896 let input_expr = datafusion::logical_expr::col(
2897 self.ctx
2898 .time_index_column
2899 .as_ref()
2900 .with_context(|| TimeIndexNotFoundSnafu {
2902 table: "<doesn't matter>",
2903 })?,
2904 );
2905 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2906 func: datafusion_functions::datetime::date_part(),
2907 args: vec![lit_expr, input_expr],
2908 });
2909 Ok(fn_expr)
2910 }
2911}
2912
2913#[derive(Default, Debug)]
2914struct FunctionArgs {
2915 input: Option<PromExpr>,
2916 literals: Vec<DfExpr>,
2917}
2918
2919#[derive(Debug, Clone)]
2920enum ScalarFunc {
2921 DataFusionBuiltin(Arc<ScalarUdfDef>),
2922 DataFusionUdf(Arc<ScalarUdfDef>),
2924 Udf(Arc<ScalarUdfDef>),
2925 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
2929 GeneratedExpr,
2931}
2932
2933#[cfg(test)]
2934mod test {
2935 use std::time::{Duration, UNIX_EPOCH};
2936
2937 use catalog::memory::MemoryCatalogManager;
2938 use catalog::RegisterTableRequest;
2939 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
2940 use common_query::test_util::DummyDecoder;
2941 use datafusion::execution::SessionStateBuilder;
2942 use datatypes::prelude::ConcreteDataType;
2943 use datatypes::schema::{ColumnSchema, Schema};
2944 use promql_parser::label::Labels;
2945 use promql_parser::parser;
2946 use session::context::QueryContext;
2947 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
2948 use table::test_util::EmptyTable;
2949
2950 use super::*;
2951
2952 fn build_session_state() -> SessionState {
2953 SessionStateBuilder::new().with_default_features().build()
2954 }
2955
2956 async fn build_test_table_provider(
2957 table_name_tuples: &[(String, String)],
2958 num_tag: usize,
2959 num_field: usize,
2960 ) -> DfTableSourceProvider {
2961 let catalog_list = MemoryCatalogManager::with_default_setup();
2962 for (schema_name, table_name) in table_name_tuples {
2963 let mut columns = vec![];
2964 for i in 0..num_tag {
2965 columns.push(ColumnSchema::new(
2966 format!("tag_{i}"),
2967 ConcreteDataType::string_datatype(),
2968 false,
2969 ));
2970 }
2971 columns.push(
2972 ColumnSchema::new(
2973 "timestamp".to_string(),
2974 ConcreteDataType::timestamp_millisecond_datatype(),
2975 false,
2976 )
2977 .with_time_index(true),
2978 );
2979 for i in 0..num_field {
2980 columns.push(ColumnSchema::new(
2981 format!("field_{i}"),
2982 ConcreteDataType::float64_datatype(),
2983 true,
2984 ));
2985 }
2986 let schema = Arc::new(Schema::new(columns));
2987 let table_meta = TableMetaBuilder::empty()
2988 .schema(schema)
2989 .primary_key_indices((0..num_tag).collect())
2990 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
2991 .next_column_id(1024)
2992 .build()
2993 .unwrap();
2994 let table_info = TableInfoBuilder::default()
2995 .name(table_name.to_string())
2996 .meta(table_meta)
2997 .build()
2998 .unwrap();
2999 let table = EmptyTable::from_table_info(&table_info);
3000
3001 assert!(catalog_list
3002 .register_table_sync(RegisterTableRequest {
3003 catalog: DEFAULT_CATALOG_NAME.to_string(),
3004 schema: schema_name.to_string(),
3005 table_name: table_name.to_string(),
3006 table_id: 1024,
3007 table,
3008 })
3009 .is_ok());
3010 }
3011
3012 DfTableSourceProvider::new(
3013 catalog_list,
3014 false,
3015 QueryContext::arc(),
3016 DummyDecoder::arc(),
3017 false,
3018 )
3019 }
3020
3021 async fn build_test_table_provider_with_fields(
3022 table_name_tuples: &[(String, String)],
3023 tags: &[&str],
3024 ) -> DfTableSourceProvider {
3025 let catalog_list = MemoryCatalogManager::with_default_setup();
3026 for (schema_name, table_name) in table_name_tuples {
3027 let mut columns = vec![];
3028 let num_tag = tags.len();
3029 for tag in tags {
3030 columns.push(ColumnSchema::new(
3031 tag.to_string(),
3032 ConcreteDataType::string_datatype(),
3033 false,
3034 ));
3035 }
3036 columns.push(
3037 ColumnSchema::new(
3038 "greptime_timestamp".to_string(),
3039 ConcreteDataType::timestamp_millisecond_datatype(),
3040 false,
3041 )
3042 .with_time_index(true),
3043 );
3044 columns.push(ColumnSchema::new(
3045 "greptime_value".to_string(),
3046 ConcreteDataType::float64_datatype(),
3047 true,
3048 ));
3049 let schema = Arc::new(Schema::new(columns));
3050 let table_meta = TableMetaBuilder::empty()
3051 .schema(schema)
3052 .primary_key_indices((0..num_tag).collect())
3053 .next_column_id(1024)
3054 .build()
3055 .unwrap();
3056 let table_info = TableInfoBuilder::default()
3057 .name(table_name.to_string())
3058 .meta(table_meta)
3059 .build()
3060 .unwrap();
3061 let table = EmptyTable::from_table_info(&table_info);
3062
3063 assert!(catalog_list
3064 .register_table_sync(RegisterTableRequest {
3065 catalog: DEFAULT_CATALOG_NAME.to_string(),
3066 schema: schema_name.to_string(),
3067 table_name: table_name.to_string(),
3068 table_id: 1024,
3069 table,
3070 })
3071 .is_ok());
3072 }
3073
3074 DfTableSourceProvider::new(
3075 catalog_list,
3076 false,
3077 QueryContext::arc(),
3078 DummyDecoder::arc(),
3079 false,
3080 )
3081 }
3082
3083 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3099 let prom_expr =
3100 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3101 let eval_stmt = EvalStmt {
3102 expr: prom_expr,
3103 start: UNIX_EPOCH,
3104 end: UNIX_EPOCH
3105 .checked_add(Duration::from_secs(100_000))
3106 .unwrap(),
3107 interval: Duration::from_secs(5),
3108 lookback_delta: Duration::from_secs(1),
3109 };
3110
3111 let table_provider = build_test_table_provider(
3112 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3113 1,
3114 1,
3115 )
3116 .await;
3117 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3118 .await
3119 .unwrap();
3120
3121 let expected = String::from(
3122 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3123 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3124 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3125 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3126 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3127 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3128 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3129 ).replace("TEMPLATE", plan_name);
3130
3131 assert_eq!(plan.display_indent_schema().to_string(), expected);
3132 }
3133
3134 #[tokio::test]
3135 async fn single_abs() {
3136 do_single_instant_function_call("abs", "abs").await;
3137 }
3138
3139 #[tokio::test]
3140 #[should_panic]
3141 async fn single_absent() {
3142 do_single_instant_function_call("absent", "").await;
3143 }
3144
3145 #[tokio::test]
3146 async fn single_ceil() {
3147 do_single_instant_function_call("ceil", "ceil").await;
3148 }
3149
3150 #[tokio::test]
3151 async fn single_exp() {
3152 do_single_instant_function_call("exp", "exp").await;
3153 }
3154
3155 #[tokio::test]
3156 async fn single_ln() {
3157 do_single_instant_function_call("ln", "ln").await;
3158 }
3159
3160 #[tokio::test]
3161 async fn single_log2() {
3162 do_single_instant_function_call("log2", "log2").await;
3163 }
3164
3165 #[tokio::test]
3166 async fn single_log10() {
3167 do_single_instant_function_call("log10", "log10").await;
3168 }
3169
3170 #[tokio::test]
3171 #[should_panic]
3172 async fn single_scalar() {
3173 do_single_instant_function_call("scalar", "").await;
3174 }
3175
3176 #[tokio::test]
3177 #[should_panic]
3178 async fn single_sgn() {
3179 do_single_instant_function_call("sgn", "").await;
3180 }
3181
3182 #[tokio::test]
3183 #[should_panic]
3184 async fn single_sort() {
3185 do_single_instant_function_call("sort", "").await;
3186 }
3187
3188 #[tokio::test]
3189 #[should_panic]
3190 async fn single_sort_desc() {
3191 do_single_instant_function_call("sort_desc", "").await;
3192 }
3193
3194 #[tokio::test]
3195 async fn single_sqrt() {
3196 do_single_instant_function_call("sqrt", "sqrt").await;
3197 }
3198
3199 #[tokio::test]
3200 #[should_panic]
3201 async fn single_timestamp() {
3202 do_single_instant_function_call("timestamp", "").await;
3203 }
3204
3205 #[tokio::test]
3206 async fn single_acos() {
3207 do_single_instant_function_call("acos", "acos").await;
3208 }
3209
3210 #[tokio::test]
3211 #[should_panic]
3212 async fn single_acosh() {
3213 do_single_instant_function_call("acosh", "").await;
3214 }
3215
3216 #[tokio::test]
3217 async fn single_asin() {
3218 do_single_instant_function_call("asin", "asin").await;
3219 }
3220
3221 #[tokio::test]
3222 #[should_panic]
3223 async fn single_asinh() {
3224 do_single_instant_function_call("asinh", "").await;
3225 }
3226
3227 #[tokio::test]
3228 async fn single_atan() {
3229 do_single_instant_function_call("atan", "atan").await;
3230 }
3231
3232 #[tokio::test]
3233 #[should_panic]
3234 async fn single_atanh() {
3235 do_single_instant_function_call("atanh", "").await;
3236 }
3237
3238 #[tokio::test]
3239 async fn single_cos() {
3240 do_single_instant_function_call("cos", "cos").await;
3241 }
3242
3243 #[tokio::test]
3244 #[should_panic]
3245 async fn single_cosh() {
3246 do_single_instant_function_call("cosh", "").await;
3247 }
3248
3249 #[tokio::test]
3250 async fn single_sin() {
3251 do_single_instant_function_call("sin", "sin").await;
3252 }
3253
3254 #[tokio::test]
3255 #[should_panic]
3256 async fn single_sinh() {
3257 do_single_instant_function_call("sinh", "").await;
3258 }
3259
3260 #[tokio::test]
3261 async fn single_tan() {
3262 do_single_instant_function_call("tan", "tan").await;
3263 }
3264
3265 #[tokio::test]
3266 #[should_panic]
3267 async fn single_tanh() {
3268 do_single_instant_function_call("tanh", "").await;
3269 }
3270
3271 #[tokio::test]
3272 #[should_panic]
3273 async fn single_deg() {
3274 do_single_instant_function_call("deg", "").await;
3275 }
3276
3277 #[tokio::test]
3278 #[should_panic]
3279 async fn single_rad() {
3280 do_single_instant_function_call("rad", "").await;
3281 }
3282
3283 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3305 let prom_expr = parser::parse(&format!(
3306 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3307 ))
3308 .unwrap();
3309 let mut eval_stmt = EvalStmt {
3310 expr: prom_expr,
3311 start: UNIX_EPOCH,
3312 end: UNIX_EPOCH
3313 .checked_add(Duration::from_secs(100_000))
3314 .unwrap(),
3315 interval: Duration::from_secs(5),
3316 lookback_delta: Duration::from_secs(1),
3317 };
3318
3319 let table_provider = build_test_table_provider(
3321 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3322 2,
3323 2,
3324 )
3325 .await;
3326 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3327 .await
3328 .unwrap();
3329 let expected_no_without = String::from(
3330 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3331 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3332 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3333 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3334 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3335 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3336 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3337 ).replace("TEMPLATE", plan_name);
3338 assert_eq!(
3339 plan.display_indent_schema().to_string(),
3340 expected_no_without
3341 );
3342
3343 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3345 *modifier = Some(LabelModifier::Exclude(Labels {
3346 labels: vec![String::from("tag_1")].into_iter().collect(),
3347 }));
3348 }
3349 let table_provider = build_test_table_provider(
3350 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3351 2,
3352 2,
3353 )
3354 .await;
3355 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3356 .await
3357 .unwrap();
3358 let expected_without = String::from(
3359 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3360 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3361 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3362 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3363 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3364 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3365 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3366 ).replace("TEMPLATE", plan_name);
3367 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3368 }
3369
3370 #[tokio::test]
3371 async fn aggregate_sum() {
3372 do_aggregate_expr_plan("sum", "sum").await;
3373 }
3374
3375 #[tokio::test]
3376 async fn aggregate_avg() {
3377 do_aggregate_expr_plan("avg", "avg").await;
3378 }
3379
3380 #[tokio::test]
3381 #[should_panic] async fn aggregate_count() {
3383 do_aggregate_expr_plan("count", "count").await;
3384 }
3385
3386 #[tokio::test]
3387 async fn aggregate_min() {
3388 do_aggregate_expr_plan("min", "min").await;
3389 }
3390
3391 #[tokio::test]
3392 async fn aggregate_max() {
3393 do_aggregate_expr_plan("max", "max").await;
3394 }
3395
3396 #[tokio::test]
3397 #[should_panic] async fn aggregate_group() {
3399 do_aggregate_expr_plan("grouping", "GROUPING").await;
3400 }
3401
3402 #[tokio::test]
3403 async fn aggregate_stddev() {
3404 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3405 }
3406
3407 #[tokio::test]
3408 async fn aggregate_stdvar() {
3409 do_aggregate_expr_plan("stdvar", "var_pop").await;
3410 }
3411
3412 #[tokio::test]
3436 async fn binary_op_column_column() {
3437 let prom_expr =
3438 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3439 let eval_stmt = EvalStmt {
3440 expr: prom_expr,
3441 start: UNIX_EPOCH,
3442 end: UNIX_EPOCH
3443 .checked_add(Duration::from_secs(100_000))
3444 .unwrap(),
3445 interval: Duration::from_secs(5),
3446 lookback_delta: Duration::from_secs(1),
3447 };
3448
3449 let table_provider = build_test_table_provider(
3450 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3451 1,
3452 1,
3453 )
3454 .await;
3455 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3456 .await
3457 .unwrap();
3458
3459 let expected = String::from(
3460 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3461 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3462 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3463 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3464 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3465 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3466 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3467 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3468 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3469 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3470 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3471 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3472 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3473 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3474 );
3475
3476 assert_eq!(plan.display_indent_schema().to_string(), expected);
3477 }
3478
3479 async fn indie_query_plan_compare(query: &str, expected: String) {
3480 let prom_expr = parser::parse(query).unwrap();
3481 let eval_stmt = EvalStmt {
3482 expr: prom_expr,
3483 start: UNIX_EPOCH,
3484 end: UNIX_EPOCH
3485 .checked_add(Duration::from_secs(100_000))
3486 .unwrap(),
3487 interval: Duration::from_secs(5),
3488 lookback_delta: Duration::from_secs(1),
3489 };
3490
3491 let table_provider = build_test_table_provider(
3492 &[
3493 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3494 (
3495 "greptime_private".to_string(),
3496 "some_alt_metric".to_string(),
3497 ),
3498 ],
3499 1,
3500 1,
3501 )
3502 .await;
3503 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3504 .await
3505 .unwrap();
3506
3507 assert_eq!(plan.display_indent_schema().to_string(), expected);
3508 }
3509
3510 #[tokio::test]
3511 async fn binary_op_literal_column() {
3512 let query = r#"1 + some_metric{tag_0="bar"}"#;
3513 let expected = String::from(
3514 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3515 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3516 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3517 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3518 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3519 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3520 );
3521
3522 indie_query_plan_compare(query, expected).await;
3523 }
3524
3525 #[tokio::test]
3526 async fn binary_op_literal_literal() {
3527 let query = r#"1 + 1"#;
3528 let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3529
3530 indie_query_plan_compare(query, expected).await;
3531 }
3532
3533 #[tokio::test]
3534 async fn simple_bool_grammar() {
3535 let query = "some_metric != bool 1.2345";
3536 let expected = String::from(
3537 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3538 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3539 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3540 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3541 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3542 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3543 );
3544
3545 indie_query_plan_compare(query, expected).await;
3546 }
3547
3548 #[tokio::test]
3549 async fn bool_with_additional_arithmetic() {
3550 let query = "some_metric + (1 == bool 2)";
3551 let expected = String::from(
3552 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3553 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3554 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3555 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3556 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3557 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3558 );
3559
3560 indie_query_plan_compare(query, expected).await;
3561 }
3562
3563 #[tokio::test]
3564 async fn simple_unary() {
3565 let query = "-some_metric";
3566 let expected = String::from(
3567 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3568 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3569 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3570 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3571 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3572 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3573 );
3574
3575 indie_query_plan_compare(query, expected).await;
3576 }
3577
3578 #[tokio::test]
3579 async fn increase_aggr() {
3580 let query = "increase(some_metric[5m])";
3581 let expected = String::from(
3582 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3583 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3584 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3585 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3586 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3587 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3588 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3589 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3590 );
3591
3592 indie_query_plan_compare(query, expected).await;
3593 }
3594
3595 #[tokio::test]
3596 async fn less_filter_on_value() {
3597 let query = "some_metric < 1.2345";
3598 let expected = String::from(
3599 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3600 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3601 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3602 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3603 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3604 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3605 );
3606
3607 indie_query_plan_compare(query, expected).await;
3608 }
3609
3610 #[tokio::test]
3611 async fn count_over_time() {
3612 let query = "count_over_time(some_metric[5m])";
3613 let expected = String::from(
3614 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3615 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3616 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3617 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3618 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3619 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3620 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3621 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3622 );
3623
3624 indie_query_plan_compare(query, expected).await;
3625 }
3626
3627 #[tokio::test]
3628 async fn test_hash_join() {
3629 let mut eval_stmt = EvalStmt {
3630 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3631 start: UNIX_EPOCH,
3632 end: UNIX_EPOCH
3633 .checked_add(Duration::from_secs(100_000))
3634 .unwrap(),
3635 interval: Duration::from_secs(5),
3636 lookback_delta: Duration::from_secs(1),
3637 };
3638
3639 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3640
3641 let prom_expr = parser::parse(case).unwrap();
3642 eval_stmt.expr = prom_expr;
3643 let table_provider = build_test_table_provider_with_fields(
3644 &[
3645 (
3646 DEFAULT_SCHEMA_NAME.to_string(),
3647 "http_server_requests_seconds_sum".to_string(),
3648 ),
3649 (
3650 DEFAULT_SCHEMA_NAME.to_string(),
3651 "http_server_requests_seconds_count".to_string(),
3652 ),
3653 ],
3654 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3655 )
3656 .await;
3657 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3659 .await
3660 .unwrap();
3661 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3662 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3663 \n SubqueryAlias: http_server_requests_seconds_sum\
3664 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3665 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3666 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3667 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3668 \n TableScan: http_server_requests_seconds_sum\
3669 \n SubqueryAlias: http_server_requests_seconds_count\
3670 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3671 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3672 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3673 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3674 \n TableScan: http_server_requests_seconds_count";
3675 assert_eq!(plan.to_string(), expected);
3676 }
3677
3678 #[tokio::test]
3679 async fn test_nested_histogram_quantile() {
3680 let mut eval_stmt = EvalStmt {
3681 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3682 start: UNIX_EPOCH,
3683 end: UNIX_EPOCH
3684 .checked_add(Duration::from_secs(100_000))
3685 .unwrap(),
3686 interval: Duration::from_secs(5),
3687 lookback_delta: Duration::from_secs(1),
3688 };
3689
3690 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3691
3692 let prom_expr = parser::parse(case).unwrap();
3693 eval_stmt.expr = prom_expr;
3694 let table_provider = build_test_table_provider_with_fields(
3695 &[(
3696 DEFAULT_SCHEMA_NAME.to_string(),
3697 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3698 )],
3699 &["pod", "le", "path", "code", "container"],
3700 )
3701 .await;
3702 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3704 .await
3705 .unwrap();
3706 }
3707
3708 #[tokio::test]
3709 async fn test_parse_and_operator() {
3710 let mut eval_stmt = EvalStmt {
3711 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3712 start: UNIX_EPOCH,
3713 end: UNIX_EPOCH
3714 .checked_add(Duration::from_secs(100_000))
3715 .unwrap(),
3716 interval: Duration::from_secs(5),
3717 lookback_delta: Duration::from_secs(1),
3718 };
3719
3720 let cases = [
3721 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3722 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3723 ];
3724
3725 for case in cases {
3726 let prom_expr = parser::parse(case).unwrap();
3727 eval_stmt.expr = prom_expr;
3728 let table_provider = build_test_table_provider_with_fields(
3729 &[
3730 (
3731 DEFAULT_SCHEMA_NAME.to_string(),
3732 "kubelet_volume_stats_used_bytes".to_string(),
3733 ),
3734 (
3735 DEFAULT_SCHEMA_NAME.to_string(),
3736 "kubelet_volume_stats_capacity_bytes".to_string(),
3737 ),
3738 ],
3739 &["namespace", "persistentvolumeclaim"],
3740 )
3741 .await;
3742 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3744 .await
3745 .unwrap();
3746 }
3747 }
3748
3749 #[tokio::test]
3750 async fn test_nested_binary_op() {
3751 let mut eval_stmt = EvalStmt {
3752 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3753 start: UNIX_EPOCH,
3754 end: UNIX_EPOCH
3755 .checked_add(Duration::from_secs(100_000))
3756 .unwrap(),
3757 interval: Duration::from_secs(5),
3758 lookback_delta: Duration::from_secs(1),
3759 };
3760
3761 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3762 (
3763 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3764 or
3765 vector(0)
3766 )"#;
3767
3768 let prom_expr = parser::parse(case).unwrap();
3769 eval_stmt.expr = prom_expr;
3770 let table_provider = build_test_table_provider_with_fields(
3771 &[(
3772 DEFAULT_SCHEMA_NAME.to_string(),
3773 "nginx_ingress_controller_requests".to_string(),
3774 )],
3775 &["namespace", "job"],
3776 )
3777 .await;
3778 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3780 .await
3781 .unwrap();
3782 }
3783
3784 #[tokio::test]
3785 async fn test_parse_or_operator() {
3786 let mut eval_stmt = EvalStmt {
3787 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3788 start: UNIX_EPOCH,
3789 end: UNIX_EPOCH
3790 .checked_add(Duration::from_secs(100_000))
3791 .unwrap(),
3792 interval: Duration::from_secs(5),
3793 lookback_delta: Duration::from_secs(1),
3794 };
3795
3796 let case = r#"
3797 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
3798 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3799 or
3800 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
3801 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3802
3803 let table_provider = build_test_table_provider_with_fields(
3804 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3805 &["tenant_name", "cluster_name"],
3806 )
3807 .await;
3808 eval_stmt.expr = parser::parse(case).unwrap();
3809 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3810 .await
3811 .unwrap();
3812
3813 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3814 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3815 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3816 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3817 or
3818 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3819 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3820 or
3821 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3822 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3823 let table_provider = build_test_table_provider_with_fields(
3824 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3825 &["tenant_name", "cluster_name"],
3826 )
3827 .await;
3828 eval_stmt.expr = parser::parse(case).unwrap();
3829 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3830 .await
3831 .unwrap();
3832
3833 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
3834 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3835 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3836 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3837 let table_provider = build_test_table_provider_with_fields(
3838 &[
3839 (
3840 DEFAULT_SCHEMA_NAME.to_string(),
3841 "background_waitevent_cnt".to_string(),
3842 ),
3843 (
3844 DEFAULT_SCHEMA_NAME.to_string(),
3845 "foreground_waitevent_cnt".to_string(),
3846 ),
3847 ],
3848 &["tenant_name", "cluster_name"],
3849 )
3850 .await;
3851 eval_stmt.expr = parser::parse(case).unwrap();
3852 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3853 .await
3854 .unwrap();
3855
3856 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3857 let table_provider = build_test_table_provider_with_fields(
3858 &[
3859 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3860 (
3861 DEFAULT_SCHEMA_NAME.to_string(),
3862 "container_cpu_load_average_10s".to_string(),
3863 ),
3864 (
3865 DEFAULT_SCHEMA_NAME.to_string(),
3866 "container_spec_cpu_quota".to_string(),
3867 ),
3868 ],
3869 &["cluster_name", "host_name"],
3870 )
3871 .await;
3872 eval_stmt.expr = parser::parse(case).unwrap();
3873 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3874 .await
3875 .unwrap();
3876 }
3877
3878 #[tokio::test]
3879 async fn value_matcher() {
3880 let mut eval_stmt = EvalStmt {
3882 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3883 start: UNIX_EPOCH,
3884 end: UNIX_EPOCH
3885 .checked_add(Duration::from_secs(100_000))
3886 .unwrap(),
3887 interval: Duration::from_secs(5),
3888 lookback_delta: Duration::from_secs(1),
3889 };
3890
3891 let cases = [
3892 (
3894 r#"some_metric{__field__="field_1"}"#,
3895 vec![
3896 "some_metric.field_1",
3897 "some_metric.tag_0",
3898 "some_metric.tag_1",
3899 "some_metric.tag_2",
3900 "some_metric.timestamp",
3901 ],
3902 ),
3903 (
3905 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
3906 vec![
3907 "some_metric.field_0",
3908 "some_metric.field_1",
3909 "some_metric.tag_0",
3910 "some_metric.tag_1",
3911 "some_metric.tag_2",
3912 "some_metric.timestamp",
3913 ],
3914 ),
3915 (
3917 r#"some_metric{__field__!="field_1"}"#,
3918 vec![
3919 "some_metric.field_0",
3920 "some_metric.field_2",
3921 "some_metric.tag_0",
3922 "some_metric.tag_1",
3923 "some_metric.tag_2",
3924 "some_metric.timestamp",
3925 ],
3926 ),
3927 (
3929 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
3930 vec![
3931 "some_metric.field_0",
3932 "some_metric.tag_0",
3933 "some_metric.tag_1",
3934 "some_metric.tag_2",
3935 "some_metric.timestamp",
3936 ],
3937 ),
3938 (
3940 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
3941 vec![
3942 "some_metric.field_1",
3943 "some_metric.tag_0",
3944 "some_metric.tag_1",
3945 "some_metric.tag_2",
3946 "some_metric.timestamp",
3947 ],
3948 ),
3949 (
3951 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
3952 vec![
3953 "some_metric.tag_0",
3954 "some_metric.tag_1",
3955 "some_metric.tag_2",
3956 "some_metric.timestamp",
3957 ],
3958 ),
3959 (
3961 r#"some_metric{__field__=~"field_1|field_2"}"#,
3962 vec![
3963 "some_metric.field_1",
3964 "some_metric.field_2",
3965 "some_metric.tag_0",
3966 "some_metric.tag_1",
3967 "some_metric.tag_2",
3968 "some_metric.timestamp",
3969 ],
3970 ),
3971 (
3973 r#"some_metric{__field__!~"field_1|field_2"}"#,
3974 vec![
3975 "some_metric.field_0",
3976 "some_metric.tag_0",
3977 "some_metric.tag_1",
3978 "some_metric.tag_2",
3979 "some_metric.timestamp",
3980 ],
3981 ),
3982 ];
3983
3984 for case in cases {
3985 let prom_expr = parser::parse(case.0).unwrap();
3986 eval_stmt.expr = prom_expr;
3987 let table_provider = build_test_table_provider(
3988 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3989 3,
3990 3,
3991 )
3992 .await;
3993 let plan =
3994 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3995 .await
3996 .unwrap();
3997 let mut fields = plan.schema().field_names();
3998 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
3999 fields.sort();
4000 expected.sort();
4001 assert_eq!(fields, expected, "case: {:?}", case.0);
4002 }
4003
4004 let bad_cases = [
4005 r#"some_metric{__field__="nonexistent"}"#,
4006 r#"some_metric{__field__!="nonexistent"}"#,
4007 ];
4008
4009 for case in bad_cases {
4010 let prom_expr = parser::parse(case).unwrap();
4011 eval_stmt.expr = prom_expr;
4012 let table_provider = build_test_table_provider(
4013 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4014 3,
4015 3,
4016 )
4017 .await;
4018 let plan =
4019 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4020 assert!(plan.is_err(), "case: {:?}", case);
4021 }
4022 }
4023
4024 #[tokio::test]
4025 async fn custom_schema() {
4026 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4027 let expected = String::from(
4028 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4029 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4030 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4031 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4033 );
4034
4035 indie_query_plan_compare(query, expected).await;
4036
4037 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4038 let expected = String::from(
4039 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4040 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4041 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4042 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4043 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4044 );
4045
4046 indie_query_plan_compare(query, expected).await;
4047
4048 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4049 let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4050 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4051 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4052 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4053 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4054 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4059 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4063
4064 indie_query_plan_compare(query, expected).await;
4065 }
4066
4067 #[tokio::test]
4068 async fn only_equals_is_supported_for_special_matcher() {
4069 let queries = &[
4070 "some_alt_metric{__schema__!=\"greptime_private\"}",
4071 "some_alt_metric{__schema__=~\"lalala\"}",
4072 "some_alt_metric{__database__!=\"greptime_private\"}",
4073 "some_alt_metric{__database__=~\"lalala\"}",
4074 ];
4075
4076 for query in queries {
4077 let prom_expr = parser::parse(query).unwrap();
4078 let eval_stmt = EvalStmt {
4079 expr: prom_expr,
4080 start: UNIX_EPOCH,
4081 end: UNIX_EPOCH
4082 .checked_add(Duration::from_secs(100_000))
4083 .unwrap(),
4084 interval: Duration::from_secs(5),
4085 lookback_delta: Duration::from_secs(1),
4086 };
4087
4088 let table_provider = build_test_table_provider(
4089 &[
4090 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4091 (
4092 "greptime_private".to_string(),
4093 "some_alt_metric".to_string(),
4094 ),
4095 ],
4096 1,
4097 1,
4098 )
4099 .await;
4100
4101 let plan =
4102 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4103 assert!(plan.is_err(), "query: {:?}", query);
4104 }
4105 }
4106
4107 #[tokio::test]
4108 async fn test_non_ms_precision() {
4109 let catalog_list = MemoryCatalogManager::with_default_setup();
4110 let columns = vec![
4111 ColumnSchema::new(
4112 "tag".to_string(),
4113 ConcreteDataType::string_datatype(),
4114 false,
4115 ),
4116 ColumnSchema::new(
4117 "timestamp".to_string(),
4118 ConcreteDataType::timestamp_nanosecond_datatype(),
4119 false,
4120 )
4121 .with_time_index(true),
4122 ColumnSchema::new(
4123 "field".to_string(),
4124 ConcreteDataType::float64_datatype(),
4125 true,
4126 ),
4127 ];
4128 let schema = Arc::new(Schema::new(columns));
4129 let table_meta = TableMetaBuilder::empty()
4130 .schema(schema)
4131 .primary_key_indices(vec![0])
4132 .value_indices(vec![2])
4133 .next_column_id(1024)
4134 .build()
4135 .unwrap();
4136 let table_info = TableInfoBuilder::default()
4137 .name("metrics".to_string())
4138 .meta(table_meta)
4139 .build()
4140 .unwrap();
4141 let table = EmptyTable::from_table_info(&table_info);
4142 assert!(catalog_list
4143 .register_table_sync(RegisterTableRequest {
4144 catalog: DEFAULT_CATALOG_NAME.to_string(),
4145 schema: DEFAULT_SCHEMA_NAME.to_string(),
4146 table_name: "metrics".to_string(),
4147 table_id: 1024,
4148 table,
4149 })
4150 .is_ok());
4151
4152 let plan = PromPlanner::stmt_to_plan(
4153 DfTableSourceProvider::new(
4154 catalog_list.clone(),
4155 false,
4156 QueryContext::arc(),
4157 DummyDecoder::arc(),
4158 true,
4159 ),
4160 &EvalStmt {
4161 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4162 start: UNIX_EPOCH,
4163 end: UNIX_EPOCH
4164 .checked_add(Duration::from_secs(100_000))
4165 .unwrap(),
4166 interval: Duration::from_secs(5),
4167 lookback_delta: Duration::from_secs(1),
4168 },
4169 &build_session_state(),
4170 )
4171 .await
4172 .unwrap();
4173 assert_eq!(plan.display_indent_schema().to_string(),
4174 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4175 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4176 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4177 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4178 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4179 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4180 );
4181 let plan = PromPlanner::stmt_to_plan(
4182 DfTableSourceProvider::new(
4183 catalog_list.clone(),
4184 false,
4185 QueryContext::arc(),
4186 DummyDecoder::arc(),
4187 true,
4188 ),
4189 &EvalStmt {
4190 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4191 start: UNIX_EPOCH,
4192 end: UNIX_EPOCH
4193 .checked_add(Duration::from_secs(100_000))
4194 .unwrap(),
4195 interval: Duration::from_secs(5),
4196 lookback_delta: Duration::from_secs(1),
4197 },
4198 &build_session_state(),
4199 )
4200 .await
4201 .unwrap();
4202 assert_eq!(plan.display_indent_schema().to_string(),
4203 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4204 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4205 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4206 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4207 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4208 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4209 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4210 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4211 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4212 );
4213 }
4214
4215 #[tokio::test]
4216 async fn test_nonexistent_label() {
4217 let mut eval_stmt = EvalStmt {
4219 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4220 start: UNIX_EPOCH,
4221 end: UNIX_EPOCH
4222 .checked_add(Duration::from_secs(100_000))
4223 .unwrap(),
4224 interval: Duration::from_secs(5),
4225 lookback_delta: Duration::from_secs(1),
4226 };
4227
4228 let case = r#"some_metric{nonexistent="hi"}"#;
4229 let prom_expr = parser::parse(case).unwrap();
4230 eval_stmt.expr = prom_expr;
4231 let table_provider = build_test_table_provider(
4232 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4233 3,
4234 3,
4235 )
4236 .await;
4237 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4239 .await
4240 .unwrap();
4241 }
4242
4243 #[tokio::test]
4244 async fn test_label_join() {
4245 let prom_expr = parser::parse(
4246 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4247 )
4248 .unwrap();
4249 let eval_stmt = EvalStmt {
4250 expr: prom_expr,
4251 start: UNIX_EPOCH,
4252 end: UNIX_EPOCH
4253 .checked_add(Duration::from_secs(100_000))
4254 .unwrap(),
4255 interval: Duration::from_secs(5),
4256 lookback_delta: Duration::from_secs(1),
4257 };
4258
4259 let table_provider =
4260 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4261 .await;
4262 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4263 .await
4264 .unwrap();
4265
4266 let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4267 \n Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4268 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4269 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4270 \n Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4271 \n Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4272 \n TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4273
4274 assert_eq!(plan.display_indent_schema().to_string(), expected);
4275 }
4276
4277 #[tokio::test]
4278 async fn test_label_replace() {
4279 let prom_expr = parser::parse(
4280 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4281 )
4282 .unwrap();
4283 let eval_stmt = EvalStmt {
4284 expr: prom_expr,
4285 start: UNIX_EPOCH,
4286 end: UNIX_EPOCH
4287 .checked_add(Duration::from_secs(100_000))
4288 .unwrap(),
4289 interval: Duration::from_secs(5),
4290 lookback_delta: Duration::from_secs(1),
4291 };
4292
4293 let table_provider =
4294 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4295 .await;
4296 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4297 .await
4298 .unwrap();
4299
4300 let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4301 \n Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4302 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4303 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4304 \n Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4305 \n Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4306 \n TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4307
4308 assert_eq!(plan.display_indent_schema().to_string(), expected);
4309 }
4310
4311 #[tokio::test]
4312 async fn test_matchers_to_expr() {
4313 let mut eval_stmt = EvalStmt {
4314 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4315 start: UNIX_EPOCH,
4316 end: UNIX_EPOCH
4317 .checked_add(Duration::from_secs(100_000))
4318 .unwrap(),
4319 interval: Duration::from_secs(5),
4320 lookback_delta: Duration::from_secs(1),
4321 };
4322 let case =
4323 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4324
4325 let prom_expr = parser::parse(case).unwrap();
4326 eval_stmt.expr = prom_expr;
4327 let table_provider = build_test_table_provider(
4328 &[(
4329 DEFAULT_SCHEMA_NAME.to_string(),
4330 "prometheus_tsdb_head_series".to_string(),
4331 )],
4332 3,
4333 3,
4334 )
4335 .await;
4336 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4337 .await
4338 .unwrap();
4339 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4340 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4341 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4342 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4343 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4344 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4345 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4346 assert_eq!(plan.display_indent_schema().to_string(), expected);
4347 }
4348
4349 #[tokio::test]
4350 async fn test_topk_expr() {
4351 let mut eval_stmt = EvalStmt {
4352 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4353 start: UNIX_EPOCH,
4354 end: UNIX_EPOCH
4355 .checked_add(Duration::from_secs(100_000))
4356 .unwrap(),
4357 interval: Duration::from_secs(5),
4358 lookback_delta: Duration::from_secs(1),
4359 };
4360 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4361
4362 let prom_expr = parser::parse(case).unwrap();
4363 eval_stmt.expr = prom_expr;
4364 let table_provider = build_test_table_provider_with_fields(
4365 &[
4366 (
4367 DEFAULT_SCHEMA_NAME.to_string(),
4368 "prometheus_tsdb_head_series".to_string(),
4369 ),
4370 (
4371 DEFAULT_SCHEMA_NAME.to_string(),
4372 "http_server_requests_seconds_count".to_string(),
4373 ),
4374 ],
4375 &["ip"],
4376 )
4377 .await;
4378
4379 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4380 .await
4381 .unwrap();
4382 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4383 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4384 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4385 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4386 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4387 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4388 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4389 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4390 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4391 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4392 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4393
4394 assert_eq!(plan.display_indent_schema().to_string(), expected);
4395 }
4396
4397 #[tokio::test]
4398 async fn test_count_values_expr() {
4399 let mut eval_stmt = EvalStmt {
4400 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4401 start: UNIX_EPOCH,
4402 end: UNIX_EPOCH
4403 .checked_add(Duration::from_secs(100_000))
4404 .unwrap(),
4405 interval: Duration::from_secs(5),
4406 lookback_delta: Duration::from_secs(1),
4407 };
4408 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4409
4410 let prom_expr = parser::parse(case).unwrap();
4411 eval_stmt.expr = prom_expr;
4412 let table_provider = build_test_table_provider_with_fields(
4413 &[
4414 (
4415 DEFAULT_SCHEMA_NAME.to_string(),
4416 "prometheus_tsdb_head_series".to_string(),
4417 ),
4418 (
4419 DEFAULT_SCHEMA_NAME.to_string(),
4420 "http_server_requests_seconds_count".to_string(),
4421 ),
4422 ],
4423 &["ip"],
4424 )
4425 .await;
4426
4427 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4428 .await
4429 .unwrap();
4430 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4431 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4432 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4433 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4434 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4435 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4436 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4437 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4438 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4439
4440 assert_eq!(plan.display_indent_schema().to_string(), expected);
4441 }
4442
4443 #[tokio::test]
4444 async fn test_quantile_expr() {
4445 let mut eval_stmt = EvalStmt {
4446 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4447 start: UNIX_EPOCH,
4448 end: UNIX_EPOCH
4449 .checked_add(Duration::from_secs(100_000))
4450 .unwrap(),
4451 interval: Duration::from_secs(5),
4452 lookback_delta: Duration::from_secs(1),
4453 };
4454 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4455
4456 let prom_expr = parser::parse(case).unwrap();
4457 eval_stmt.expr = prom_expr;
4458 let table_provider = build_test_table_provider_with_fields(
4459 &[
4460 (
4461 DEFAULT_SCHEMA_NAME.to_string(),
4462 "prometheus_tsdb_head_series".to_string(),
4463 ),
4464 (
4465 DEFAULT_SCHEMA_NAME.to_string(),
4466 "http_server_requests_seconds_count".to_string(),
4467 ),
4468 ],
4469 &["ip"],
4470 )
4471 .await;
4472
4473 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4474 .await
4475 .unwrap();
4476 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4477 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4478 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4479 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4480 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4481 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4482 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4483 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4484 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4485
4486 assert_eq!(plan.display_indent_schema().to_string(), expected);
4487 }
4488}