1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::GREPTIME_VALUE;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61 Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63 quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71 VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88 ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92const SPECIAL_TIME_FUNCTION: &str = "time";
94const SCALAR_FUNCTION: &str = "scalar";
96const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102const LE_COLUMN_NAME: &str = "le";
104
105static LABEL_NAME_REGEX: Lazy<Regex> =
108 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112const DEFAULT_FIELD_COLUMN: &str = "value";
114
115const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122const MAX_SCATTER_POINTS: i64 = 400;
124
125const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130 start: Millisecond,
132 end: Millisecond,
133 interval: Millisecond,
134 lookback_delta: Millisecond,
135
136 table_name: Option<String>,
138 time_index_column: Option<String>,
139 field_columns: Vec<String>,
140 tag_columns: Vec<String>,
141 field_column_matcher: Option<Vec<Matcher>>,
143 selector_matcher: Vec<Matcher>,
145 schema_name: Option<String>,
146 range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152 Self {
153 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155 interval: stmt.interval.as_millis() as _,
156 lookback_delta: stmt.lookback_delta.as_millis() as _,
157 ..Default::default()
158 }
159 }
160
161 fn reset(&mut self) {
163 self.table_name = None;
164 self.time_index_column = None;
165 self.field_columns = vec![];
166 self.tag_columns = vec![];
167 self.field_column_matcher = None;
168 self.selector_matcher.clear();
169 self.schema_name = None;
170 self.range = None;
171 }
172
173 fn reset_table_name_and_schema(&mut self) {
175 self.table_name = Some(String::new());
176 self.schema_name = None;
177 }
178
179 fn has_le_tag(&self) -> bool {
181 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182 }
183}
184
185pub struct PromPlanner {
186 table_provider: DfTableSourceProvider,
187 ctx: PromPlannerContext,
188}
189
190pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
192 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
193 matcher.value = unescaped_value;
194 }
195 matcher
196}
197
198impl PromPlanner {
199 pub async fn stmt_to_plan(
200 table_provider: DfTableSourceProvider,
201 stmt: &EvalStmt,
202 query_engine_state: &QueryEngineState,
203 ) -> Result<LogicalPlan> {
204 let mut planner = Self {
205 table_provider,
206 ctx: PromPlannerContext::from_eval_stmt(stmt),
207 };
208
209 planner
210 .prom_expr_to_plan(&stmt.expr, query_engine_state)
211 .await
212 }
213
214 pub async fn prom_expr_to_plan(
215 &mut self,
216 prom_expr: &PromExpr,
217 query_engine_state: &QueryEngineState,
218 ) -> Result<LogicalPlan> {
219 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
220 .await
221 }
222
223 #[async_recursion]
233 async fn prom_expr_to_plan_inner(
234 &mut self,
235 prom_expr: &PromExpr,
236 timestamp_fn: bool,
237 query_engine_state: &QueryEngineState,
238 ) -> Result<LogicalPlan> {
239 let res = match prom_expr {
240 PromExpr::Aggregate(expr) => {
241 self.prom_aggr_expr_to_plan(query_engine_state, expr)
242 .await?
243 }
244 PromExpr::Unary(expr) => {
245 self.prom_unary_expr_to_plan(query_engine_state, expr)
246 .await?
247 }
248 PromExpr::Binary(expr) => {
249 self.prom_binary_expr_to_plan(query_engine_state, expr)
250 .await?
251 }
252 PromExpr::Paren(ParenExpr { expr }) => {
253 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
254 .await?
255 }
256 PromExpr::Subquery(expr) => {
257 self.prom_subquery_expr_to_plan(query_engine_state, expr)
258 .await?
259 }
260 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
261 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
262 PromExpr::VectorSelector(selector) => {
263 self.prom_vector_selector_to_plan(selector, timestamp_fn)
264 .await?
265 }
266 PromExpr::MatrixSelector(selector) => {
267 self.prom_matrix_selector_to_plan(selector).await?
268 }
269 PromExpr::Call(expr) => {
270 self.prom_call_expr_to_plan(query_engine_state, expr)
271 .await?
272 }
273 PromExpr::Extension(expr) => {
274 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
275 }
276 };
277
278 Ok(res)
279 }
280
281 async fn prom_subquery_expr_to_plan(
282 &mut self,
283 query_engine_state: &QueryEngineState,
284 subquery_expr: &SubqueryExpr,
285 ) -> Result<LogicalPlan> {
286 let SubqueryExpr {
287 expr, range, step, ..
288 } = subquery_expr;
289
290 let current_interval = self.ctx.interval;
291 if let Some(step) = step {
292 self.ctx.interval = step.as_millis() as _;
293 }
294 let current_start = self.ctx.start;
295 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
296 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
297 self.ctx.interval = current_interval;
298 self.ctx.start = current_start;
299
300 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
301 let range_ms = range.as_millis() as _;
302 self.ctx.range = Some(range_ms);
303
304 let manipulate = RangeManipulate::new(
305 self.ctx.start,
306 self.ctx.end,
307 self.ctx.interval,
308 range_ms,
309 self.ctx
310 .time_index_column
311 .clone()
312 .expect("time index should be set in `setup_context`"),
313 self.ctx.field_columns.clone(),
314 input,
315 )
316 .context(DataFusionPlanningSnafu)?;
317
318 Ok(LogicalPlan::Extension(Extension {
319 node: Arc::new(manipulate),
320 }))
321 }
322
323 async fn prom_aggr_expr_to_plan(
324 &mut self,
325 query_engine_state: &QueryEngineState,
326 aggr_expr: &AggregateExpr,
327 ) -> Result<LogicalPlan> {
328 let AggregateExpr {
329 op,
330 expr,
331 modifier,
332 param,
333 } = aggr_expr;
334
335 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
336
337 match (*op).id() {
338 token::T_TOPK | token::T_BOTTOMK => {
339 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
340 }
341 _ => {
342 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
345 let (aggr_exprs, prev_field_exprs) =
347 self.create_aggregate_exprs(*op, param, &input)?;
348
349 let builder = LogicalPlanBuilder::from(input);
351 let builder = if op.id() == token::T_COUNT_VALUES {
352 let label = Self::get_param_value_as_str(*op, param)?;
353 group_exprs.extend(prev_field_exprs.clone());
356 let project_fields = self
357 .create_field_column_exprs()?
358 .into_iter()
359 .chain(self.create_tag_column_exprs()?)
360 .chain(Some(self.create_time_index_column_expr()?))
361 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
362
363 builder
364 .aggregate(group_exprs.clone(), aggr_exprs)
365 .context(DataFusionPlanningSnafu)?
366 .project(project_fields)
367 .context(DataFusionPlanningSnafu)?
368 } else {
369 builder
370 .aggregate(group_exprs.clone(), aggr_exprs)
371 .context(DataFusionPlanningSnafu)?
372 };
373
374 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
375
376 builder
377 .sort(sort_expr)
378 .context(DataFusionPlanningSnafu)?
379 .build()
380 .context(DataFusionPlanningSnafu)
381 }
382 }
383 }
384
385 async fn prom_topk_bottomk_to_plan(
387 &mut self,
388 aggr_expr: &AggregateExpr,
389 input: LogicalPlan,
390 ) -> Result<LogicalPlan> {
391 let AggregateExpr {
392 op,
393 param,
394 modifier,
395 ..
396 } = aggr_expr;
397
398 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
399
400 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
401
402 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
404
405 let rank_columns: Vec<_> = window_exprs
406 .iter()
407 .map(|expr| expr.schema_name().to_string())
408 .collect();
409
410 let filter: DfExpr = rank_columns
413 .iter()
414 .fold(None, |expr, rank| {
415 let predicate = DfExpr::BinaryExpr(BinaryExpr {
416 left: Box::new(col(rank)),
417 op: Operator::LtEq,
418 right: Box::new(val.clone()),
419 });
420
421 match expr {
422 None => Some(predicate),
423 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
424 left: Box::new(expr),
425 op: Operator::Or,
426 right: Box::new(predicate),
427 })),
428 }
429 })
430 .unwrap();
431
432 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
433
434 let mut new_group_exprs = group_exprs.clone();
435 new_group_exprs.extend(rank_columns);
437
438 let group_sort_expr = new_group_exprs
439 .into_iter()
440 .map(|expr| expr.sort(true, false));
441
442 let project_fields = self
443 .create_field_column_exprs()?
444 .into_iter()
445 .chain(self.create_tag_column_exprs()?)
446 .chain(Some(self.create_time_index_column_expr()?));
447
448 LogicalPlanBuilder::from(input)
449 .window(window_exprs)
450 .context(DataFusionPlanningSnafu)?
451 .filter(filter)
452 .context(DataFusionPlanningSnafu)?
453 .sort(group_sort_expr)
454 .context(DataFusionPlanningSnafu)?
455 .project(project_fields)
456 .context(DataFusionPlanningSnafu)?
457 .build()
458 .context(DataFusionPlanningSnafu)
459 }
460
461 async fn prom_unary_expr_to_plan(
462 &mut self,
463 query_engine_state: &QueryEngineState,
464 unary_expr: &UnaryExpr,
465 ) -> Result<LogicalPlan> {
466 let UnaryExpr { expr } = unary_expr;
467 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
469 self.projection_for_each_field_column(input, |col| {
470 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
471 })
472 }
473
474 async fn prom_binary_expr_to_plan(
475 &mut self,
476 query_engine_state: &QueryEngineState,
477 binary_expr: &PromBinaryExpr,
478 ) -> Result<LogicalPlan> {
479 let PromBinaryExpr {
480 lhs,
481 rhs,
482 op,
483 modifier,
484 } = binary_expr;
485
486 let should_return_bool = if let Some(m) = modifier {
489 m.return_bool
490 } else {
491 false
492 };
493 let is_comparison_op = Self::is_token_a_comparison_op(*op);
494
495 match (
498 Self::try_build_literal_expr(lhs),
499 Self::try_build_literal_expr(rhs),
500 ) {
501 (Some(lhs), Some(rhs)) => {
502 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
503 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
504 self.ctx.reset_table_name_and_schema();
505 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
506 let mut field_expr = field_expr_builder(lhs, rhs)?;
507
508 if is_comparison_op && should_return_bool {
509 field_expr = DfExpr::Cast(Cast {
510 expr: Box::new(field_expr),
511 data_type: ArrowDataType::Float64,
512 });
513 }
514
515 Ok(LogicalPlan::Extension(Extension {
516 node: Arc::new(
517 EmptyMetric::new(
518 self.ctx.start,
519 self.ctx.end,
520 self.ctx.interval,
521 SPECIAL_TIME_FUNCTION.to_string(),
522 DEFAULT_FIELD_COLUMN.to_string(),
523 Some(field_expr),
524 )
525 .context(DataFusionPlanningSnafu)?,
526 ),
527 }))
528 }
529 (Some(mut expr), None) => {
531 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
532 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
534 expr = time_expr
535 }
536 let bin_expr_builder = |col: &String| {
537 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
538 let mut binary_expr =
539 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
540
541 if is_comparison_op && should_return_bool {
542 binary_expr = DfExpr::Cast(Cast {
543 expr: Box::new(binary_expr),
544 data_type: ArrowDataType::Float64,
545 });
546 }
547 Ok(binary_expr)
548 };
549 if is_comparison_op && !should_return_bool {
550 self.filter_on_field_column(input, bin_expr_builder)
551 } else {
552 self.projection_for_each_field_column(input, bin_expr_builder)
553 }
554 }
555 (None, Some(mut expr)) => {
557 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
558 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
560 expr = time_expr
561 }
562 let bin_expr_builder = |col: &String| {
563 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
564 let mut binary_expr =
565 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
566
567 if is_comparison_op && should_return_bool {
568 binary_expr = DfExpr::Cast(Cast {
569 expr: Box::new(binary_expr),
570 data_type: ArrowDataType::Float64,
571 });
572 }
573 Ok(binary_expr)
574 };
575 if is_comparison_op && !should_return_bool {
576 self.filter_on_field_column(input, bin_expr_builder)
577 } else {
578 self.projection_for_each_field_column(input, bin_expr_builder)
579 }
580 }
581 (None, None) => {
583 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
584 let left_field_columns = self.ctx.field_columns.clone();
585 let left_time_index_column = self.ctx.time_index_column.clone();
586 let mut left_table_ref = self
587 .table_ref()
588 .unwrap_or_else(|_| TableReference::bare(""));
589 let left_context = self.ctx.clone();
590
591 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
592 let right_field_columns = self.ctx.field_columns.clone();
593 let right_time_index_column = self.ctx.time_index_column.clone();
594 let mut right_table_ref = self
595 .table_ref()
596 .unwrap_or_else(|_| TableReference::bare(""));
597 let right_context = self.ctx.clone();
598
599 if Self::is_token_a_set_op(*op) {
603 return self.set_op_on_non_field_columns(
604 left_input,
605 right_input,
606 left_context,
607 right_context,
608 *op,
609 modifier,
610 );
611 }
612
613 if left_table_ref == right_table_ref {
615 left_table_ref = TableReference::bare("lhs");
617 right_table_ref = TableReference::bare("rhs");
618 if self.ctx.tag_columns.is_empty() {
624 self.ctx = left_context.clone();
625 self.ctx.table_name = Some("lhs".to_string());
626 } else {
627 self.ctx.table_name = Some("rhs".to_string());
628 }
629 }
630 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
631
632 let join_plan = self.join_on_non_field_columns(
633 left_input,
634 right_input,
635 left_table_ref.clone(),
636 right_table_ref.clone(),
637 left_time_index_column,
638 right_time_index_column,
639 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
642 modifier,
643 )?;
644 let join_plan_schema = join_plan.schema().clone();
645
646 let bin_expr_builder = |_: &String| {
647 let (left_col_name, right_col_name) = field_columns.next().unwrap();
648 let left_col = join_plan_schema
649 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
650 .context(DataFusionPlanningSnafu)?
651 .into();
652 let right_col = join_plan_schema
653 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
654 .context(DataFusionPlanningSnafu)?
655 .into();
656
657 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
658 let mut binary_expr =
659 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
660 if is_comparison_op && should_return_bool {
661 binary_expr = DfExpr::Cast(Cast {
662 expr: Box::new(binary_expr),
663 data_type: ArrowDataType::Float64,
664 });
665 }
666 Ok(binary_expr)
667 };
668 if is_comparison_op && !should_return_bool {
669 self.filter_on_field_column(join_plan, bin_expr_builder)
670 } else {
671 self.projection_for_each_field_column(join_plan, bin_expr_builder)
672 }
673 }
674 }
675 }
676
677 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
678 let NumberLiteral { val } = number_literal;
679 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
680 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
681 self.ctx.reset_table_name_and_schema();
682 let literal_expr = df_prelude::lit(*val);
683
684 let plan = LogicalPlan::Extension(Extension {
685 node: Arc::new(
686 EmptyMetric::new(
687 self.ctx.start,
688 self.ctx.end,
689 self.ctx.interval,
690 SPECIAL_TIME_FUNCTION.to_string(),
691 DEFAULT_FIELD_COLUMN.to_string(),
692 Some(literal_expr),
693 )
694 .context(DataFusionPlanningSnafu)?,
695 ),
696 });
697 Ok(plan)
698 }
699
700 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
701 let StringLiteral { val } = string_literal;
702 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
703 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
704 self.ctx.reset_table_name_and_schema();
705 let literal_expr = df_prelude::lit(val.to_string());
706
707 let plan = LogicalPlan::Extension(Extension {
708 node: Arc::new(
709 EmptyMetric::new(
710 self.ctx.start,
711 self.ctx.end,
712 self.ctx.interval,
713 SPECIAL_TIME_FUNCTION.to_string(),
714 DEFAULT_FIELD_COLUMN.to_string(),
715 Some(literal_expr),
716 )
717 .context(DataFusionPlanningSnafu)?,
718 ),
719 });
720 Ok(plan)
721 }
722
723 async fn prom_vector_selector_to_plan(
724 &mut self,
725 vector_selector: &VectorSelector,
726 timestamp_fn: bool,
727 ) -> Result<LogicalPlan> {
728 let VectorSelector {
729 name,
730 offset,
731 matchers,
732 at: _,
733 } = vector_selector;
734 let matchers = self.preprocess_label_matchers(matchers, name)?;
735 if let Some(empty_plan) = self.setup_context().await? {
736 return Ok(empty_plan);
737 }
738 let normalize = self
739 .selector_to_series_normalize_plan(offset, matchers, false)
740 .await?;
741
742 let normalize = if timestamp_fn {
743 self.create_timestamp_func_plan(normalize)?
746 } else {
747 normalize
748 };
749
750 let manipulate = InstantManipulate::new(
751 self.ctx.start,
752 self.ctx.end,
753 self.ctx.lookback_delta,
754 self.ctx.interval,
755 self.ctx
756 .time_index_column
757 .clone()
758 .expect("time index should be set in `setup_context`"),
759 self.ctx.field_columns.first().cloned(),
760 normalize,
761 );
762 Ok(LogicalPlan::Extension(Extension {
763 node: Arc::new(manipulate),
764 }))
765 }
766
767 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
789 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
790 .alias(DEFAULT_FIELD_COLUMN);
791 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
792 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
793 project_exprs.push(self.create_time_index_column_expr()?);
794 project_exprs.push(time_expr);
795 project_exprs.extend(self.create_tag_column_exprs()?);
796
797 LogicalPlanBuilder::from(normalize)
798 .project(project_exprs)
799 .context(DataFusionPlanningSnafu)?
800 .build()
801 .context(DataFusionPlanningSnafu)
802 }
803
804 async fn prom_matrix_selector_to_plan(
805 &mut self,
806 matrix_selector: &MatrixSelector,
807 ) -> Result<LogicalPlan> {
808 let MatrixSelector { vs, range } = matrix_selector;
809 let VectorSelector {
810 name,
811 offset,
812 matchers,
813 ..
814 } = vs;
815 let matchers = self.preprocess_label_matchers(matchers, name)?;
816 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
817 let range_ms = range.as_millis() as _;
818 self.ctx.range = Some(range_ms);
819
820 let normalize = match self.setup_context().await? {
823 Some(empty_plan) => empty_plan,
824 None => {
825 self.selector_to_series_normalize_plan(offset, matchers, true)
826 .await?
827 }
828 };
829 let manipulate = RangeManipulate::new(
830 self.ctx.start,
831 self.ctx.end,
832 self.ctx.interval,
833 range_ms,
835 self.ctx
836 .time_index_column
837 .clone()
838 .expect("time index should be set in `setup_context`"),
839 self.ctx.field_columns.clone(),
840 normalize,
841 )
842 .context(DataFusionPlanningSnafu)?;
843
844 Ok(LogicalPlan::Extension(Extension {
845 node: Arc::new(manipulate),
846 }))
847 }
848
849 async fn prom_call_expr_to_plan(
850 &mut self,
851 query_engine_state: &QueryEngineState,
852 call_expr: &Call,
853 ) -> Result<LogicalPlan> {
854 let Call { func, args } = call_expr;
855 match func.name {
857 SPECIAL_HISTOGRAM_QUANTILE => {
858 return self.create_histogram_plan(args, query_engine_state).await;
859 }
860 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
861 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
862 SPECIAL_ABSENT_FUNCTION => {
863 return self.create_absent_plan(args, query_engine_state).await;
864 }
865 _ => {}
866 }
867
868 let args = self.create_function_args(&args.args)?;
870 let input = if let Some(prom_expr) = &args.input {
871 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
872 .await?
873 } else {
874 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
875 self.ctx.reset_table_name_and_schema();
876 self.ctx.tag_columns = vec![];
877 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
878 LogicalPlan::Extension(Extension {
879 node: Arc::new(
880 EmptyMetric::new(
881 self.ctx.start,
882 self.ctx.end,
883 self.ctx.interval,
884 SPECIAL_TIME_FUNCTION.to_string(),
885 DEFAULT_FIELD_COLUMN.to_string(),
886 None,
887 )
888 .context(DataFusionPlanningSnafu)?,
889 ),
890 })
891 };
892 let (mut func_exprs, new_tags) =
893 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
894 func_exprs.insert(0, self.create_time_index_column_expr()?);
895 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
896
897 let builder = LogicalPlanBuilder::from(input)
898 .project(func_exprs)
899 .context(DataFusionPlanningSnafu)?
900 .filter(self.create_empty_values_filter_expr()?)
901 .context(DataFusionPlanningSnafu)?;
902
903 let builder = match func.name {
904 "sort" => builder
905 .sort(self.create_field_columns_sort_exprs(true))
906 .context(DataFusionPlanningSnafu)?,
907 "sort_desc" => builder
908 .sort(self.create_field_columns_sort_exprs(false))
909 .context(DataFusionPlanningSnafu)?,
910 "sort_by_label" => builder
911 .sort(Self::create_sort_exprs_by_tags(
912 func.name,
913 args.literals,
914 true,
915 )?)
916 .context(DataFusionPlanningSnafu)?,
917 "sort_by_label_desc" => builder
918 .sort(Self::create_sort_exprs_by_tags(
919 func.name,
920 args.literals,
921 false,
922 )?)
923 .context(DataFusionPlanningSnafu)?,
924
925 _ => builder,
926 };
927
928 for tag in new_tags {
931 self.ctx.tag_columns.push(tag);
932 }
933
934 let plan = builder.build().context(DataFusionPlanningSnafu)?;
935 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
936
937 Ok(plan)
938 }
939
940 async fn prom_ext_expr_to_plan(
941 &mut self,
942 query_engine_state: &QueryEngineState,
943 ext_expr: &promql_parser::parser::ast::Extension,
944 ) -> Result<LogicalPlan> {
945 let expr = &ext_expr.expr;
947 let children = expr.children();
948 let plan = self
949 .prom_expr_to_plan(&children[0], query_engine_state)
950 .await?;
951 match expr.name() {
957 "ANALYZE" => LogicalPlanBuilder::from(plan)
958 .explain(false, true)
959 .unwrap()
960 .build()
961 .context(DataFusionPlanningSnafu),
962 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
963 .explain(true, true)
964 .unwrap()
965 .build()
966 .context(DataFusionPlanningSnafu),
967 "EXPLAIN" => LogicalPlanBuilder::from(plan)
968 .explain(false, false)
969 .unwrap()
970 .build()
971 .context(DataFusionPlanningSnafu),
972 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
973 .explain(true, false)
974 .unwrap()
975 .build()
976 .context(DataFusionPlanningSnafu),
977 _ => LogicalPlanBuilder::empty(true)
978 .build()
979 .context(DataFusionPlanningSnafu),
980 }
981 }
982
983 #[allow(clippy::mutable_key_type)]
993 fn preprocess_label_matchers(
994 &mut self,
995 label_matchers: &Matchers,
996 name: &Option<String>,
997 ) -> Result<Matchers> {
998 self.ctx.reset();
999
1000 let metric_name;
1001 if let Some(name) = name.clone() {
1002 metric_name = Some(name);
1003 ensure!(
1004 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1005 MultipleMetricMatchersSnafu
1006 );
1007 } else {
1008 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1009 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1010 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1011 ensure!(
1012 matches[0].op == MatchOp::Equal,
1013 UnsupportedMatcherOpSnafu {
1014 matcher_op: matches[0].op.to_string(),
1015 matcher: METRIC_NAME
1016 }
1017 );
1018 metric_name = matches.pop().map(|m| m.value);
1019 }
1020
1021 self.ctx.table_name = metric_name;
1022
1023 let mut matchers = HashSet::new();
1024 for matcher in &label_matchers.matchers {
1025 if matcher.name == FIELD_COLUMN_MATCHER {
1027 self.ctx
1028 .field_column_matcher
1029 .get_or_insert_default()
1030 .push(matcher.clone());
1031 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1032 ensure!(
1033 matcher.op == MatchOp::Equal,
1034 UnsupportedMatcherOpSnafu {
1035 matcher: matcher.name.to_string(),
1036 matcher_op: matcher.op.to_string(),
1037 }
1038 );
1039 self.ctx.schema_name = Some(matcher.value.clone());
1040 } else if matcher.name != METRIC_NAME {
1041 self.ctx.selector_matcher.push(matcher.clone());
1042 let _ = matchers.insert(matcher.clone());
1043 }
1044 }
1045
1046 Ok(Matchers::new(
1047 matchers.into_iter().map(normalize_matcher).collect(),
1048 ))
1049 }
1050
1051 async fn selector_to_series_normalize_plan(
1052 &mut self,
1053 offset: &Option<Offset>,
1054 label_matchers: Matchers,
1055 is_range_selector: bool,
1056 ) -> Result<LogicalPlan> {
1057 let table_ref = self.table_ref()?;
1059 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1060 let table_schema = table_scan.schema();
1061
1062 let offset_duration = match offset {
1064 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1065 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1066 None => 0,
1067 };
1068 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1069 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1070 scan_filters.push(time_index_filter);
1071 }
1072 table_scan = LogicalPlanBuilder::from(table_scan)
1073 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1075 .build()
1076 .context(DataFusionPlanningSnafu)?;
1077
1078 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1080 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1081 let mut result_set = HashSet::new();
1083 let mut reverse_set = HashSet::new();
1085 for matcher in field_matchers {
1086 match &matcher.op {
1087 MatchOp::Equal => {
1088 if col_set.contains(&matcher.value) {
1089 let _ = result_set.insert(matcher.value.clone());
1090 } else {
1091 return Err(ColumnNotFoundSnafu {
1092 col: matcher.value.clone(),
1093 }
1094 .build());
1095 }
1096 }
1097 MatchOp::NotEqual => {
1098 if col_set.contains(&matcher.value) {
1099 let _ = reverse_set.insert(matcher.value.clone());
1100 } else {
1101 return Err(ColumnNotFoundSnafu {
1102 col: matcher.value.clone(),
1103 }
1104 .build());
1105 }
1106 }
1107 MatchOp::Re(regex) => {
1108 for col in &self.ctx.field_columns {
1109 if regex.is_match(col) {
1110 let _ = result_set.insert(col.clone());
1111 }
1112 }
1113 }
1114 MatchOp::NotRe(regex) => {
1115 for col in &self.ctx.field_columns {
1116 if regex.is_match(col) {
1117 let _ = reverse_set.insert(col.clone());
1118 }
1119 }
1120 }
1121 }
1122 }
1123 if result_set.is_empty() {
1125 result_set = col_set.into_iter().cloned().collect();
1126 }
1127 for col in reverse_set {
1128 let _ = result_set.remove(&col);
1129 }
1130
1131 self.ctx.field_columns = self
1133 .ctx
1134 .field_columns
1135 .drain(..)
1136 .filter(|col| result_set.contains(col))
1137 .collect();
1138
1139 let exprs = result_set
1140 .into_iter()
1141 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1142 .chain(self.create_tag_column_exprs()?)
1143 .chain(Some(self.create_time_index_column_expr()?))
1144 .collect::<Vec<_>>();
1145
1146 table_scan = LogicalPlanBuilder::from(table_scan)
1148 .project(exprs)
1149 .context(DataFusionPlanningSnafu)?
1150 .build()
1151 .context(DataFusionPlanningSnafu)?;
1152 }
1153
1154 let sort_plan = LogicalPlanBuilder::from(table_scan)
1156 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1157 .context(DataFusionPlanningSnafu)?
1158 .build()
1159 .context(DataFusionPlanningSnafu)?;
1160
1161 let time_index_column =
1163 self.ctx
1164 .time_index_column
1165 .clone()
1166 .with_context(|| TimeIndexNotFoundSnafu {
1167 table: table_ref.to_string(),
1168 })?;
1169 let divide_plan = LogicalPlan::Extension(Extension {
1170 node: Arc::new(SeriesDivide::new(
1171 self.ctx.tag_columns.clone(),
1172 time_index_column,
1173 sort_plan,
1174 )),
1175 });
1176
1177 if !is_range_selector && offset_duration == 0 {
1179 return Ok(divide_plan);
1180 }
1181 let series_normalize = SeriesNormalize::new(
1182 offset_duration,
1183 self.ctx
1184 .time_index_column
1185 .clone()
1186 .with_context(|| TimeIndexNotFoundSnafu {
1187 table: table_ref.to_quoted_string(),
1188 })?,
1189 is_range_selector,
1190 self.ctx.tag_columns.clone(),
1191 divide_plan,
1192 );
1193 let logical_plan = LogicalPlan::Extension(Extension {
1194 node: Arc::new(series_normalize),
1195 });
1196
1197 Ok(logical_plan)
1198 }
1199
1200 fn agg_modifier_to_col(
1207 &mut self,
1208 input_schema: &DFSchemaRef,
1209 modifier: &Option<LabelModifier>,
1210 update_ctx: bool,
1211 ) -> Result<Vec<DfExpr>> {
1212 match modifier {
1213 None => {
1214 if update_ctx {
1215 self.ctx.tag_columns.clear();
1216 }
1217 Ok(vec![self.create_time_index_column_expr()?])
1218 }
1219 Some(LabelModifier::Include(labels)) => {
1220 if update_ctx {
1221 self.ctx.tag_columns.clear();
1222 }
1223 let mut exprs = Vec::with_capacity(labels.labels.len());
1224 for label in &labels.labels {
1225 if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1227 exprs.push(DfExpr::Column(Column::from(field.name())));
1228
1229 if update_ctx {
1230 self.ctx.tag_columns.push(label.clone());
1232 }
1233 }
1234 }
1235 exprs.push(self.create_time_index_column_expr()?);
1237
1238 Ok(exprs)
1239 }
1240 Some(LabelModifier::Exclude(labels)) => {
1241 let mut all_fields = input_schema
1242 .fields()
1243 .iter()
1244 .map(|f| f.name())
1245 .collect::<BTreeSet<_>>();
1246
1247 for label in &labels.labels {
1250 let _ = all_fields.remove(label);
1251 }
1252
1253 if let Some(time_index) = &self.ctx.time_index_column {
1255 let _ = all_fields.remove(time_index);
1256 }
1257 for value in &self.ctx.field_columns {
1258 let _ = all_fields.remove(value);
1259 }
1260
1261 if update_ctx {
1262 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1264 }
1265
1266 let mut exprs = all_fields
1268 .into_iter()
1269 .map(|c| DfExpr::Column(Column::from(c)))
1270 .collect::<Vec<_>>();
1271
1272 exprs.push(self.create_time_index_column_expr()?);
1274
1275 Ok(exprs)
1276 }
1277 }
1278 }
1279
1280 pub fn matchers_to_expr(
1282 label_matchers: Matchers,
1283 table_schema: &DFSchemaRef,
1284 ) -> Result<Vec<DfExpr>> {
1285 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1286 for matcher in label_matchers.matchers {
1287 if matcher.name == SCHEMA_COLUMN_MATCHER
1288 || matcher.name == DB_COLUMN_MATCHER
1289 || matcher.name == FIELD_COLUMN_MATCHER
1290 {
1291 continue;
1292 }
1293
1294 let col = if table_schema
1295 .field_with_unqualified_name(&matcher.name)
1296 .is_err()
1297 {
1298 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None).alias(matcher.name)
1299 } else {
1300 DfExpr::Column(Column::from_name(matcher.name))
1301 };
1302 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1303 let expr = match matcher.op {
1304 MatchOp::Equal => col.eq(lit),
1305 MatchOp::NotEqual => col.not_eq(lit),
1306 MatchOp::Re(re) => {
1307 if re.as_str() == "^(?:.*)$" {
1313 continue;
1314 }
1315 if re.as_str() == "^(?:.+)$" {
1316 col.not_eq(DfExpr::Literal(
1317 ScalarValue::Utf8(Some(String::new())),
1318 None,
1319 ))
1320 } else {
1321 DfExpr::BinaryExpr(BinaryExpr {
1322 left: Box::new(col),
1323 op: Operator::RegexMatch,
1324 right: Box::new(DfExpr::Literal(
1325 ScalarValue::Utf8(Some(re.as_str().to_string())),
1326 None,
1327 )),
1328 })
1329 }
1330 }
1331 MatchOp::NotRe(re) => {
1332 if re.as_str() == "^(?:.*)$" {
1333 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1334 } else if re.as_str() == "^(?:.+)$" {
1335 col.eq(DfExpr::Literal(
1336 ScalarValue::Utf8(Some(String::new())),
1337 None,
1338 ))
1339 } else {
1340 DfExpr::BinaryExpr(BinaryExpr {
1341 left: Box::new(col),
1342 op: Operator::RegexNotMatch,
1343 right: Box::new(DfExpr::Literal(
1344 ScalarValue::Utf8(Some(re.as_str().to_string())),
1345 None,
1346 )),
1347 })
1348 }
1349 }
1350 };
1351 exprs.push(expr);
1352 }
1353
1354 Ok(exprs)
1355 }
1356
1357 fn table_ref(&self) -> Result<TableReference> {
1358 let table_name = self
1359 .ctx
1360 .table_name
1361 .clone()
1362 .context(TableNameNotFoundSnafu)?;
1363
1364 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1366 TableReference::partial(schema_name.as_str(), table_name.as_str())
1367 } else {
1368 TableReference::bare(table_name.as_str())
1369 };
1370
1371 Ok(table_ref)
1372 }
1373
1374 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1375 let start = self.ctx.start;
1376 let end = self.ctx.end;
1377 if end < start {
1378 return InvalidTimeRangeSnafu { start, end }.fail();
1379 }
1380 let lookback_delta = self.ctx.lookback_delta;
1381 let range = self.ctx.range.unwrap_or_default();
1382 let interval = self.ctx.interval;
1383 let time_index_expr = self.create_time_index_column_expr()?;
1384 let num_points = (end - start) / interval;
1385
1386 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1388 let single_time_range = time_index_expr
1389 .clone()
1390 .gt_eq(DfExpr::Literal(
1391 ScalarValue::TimestampMillisecond(
1392 Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1393 None,
1394 ),
1395 None,
1396 ))
1397 .and(time_index_expr.lt_eq(DfExpr::Literal(
1398 ScalarValue::TimestampMillisecond(
1399 Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1400 None,
1401 ),
1402 None,
1403 )));
1404 return Ok(Some(single_time_range));
1405 }
1406
1407 let mut filters = Vec::with_capacity(num_points as usize);
1409 for timestamp in (start..end).step_by(interval as usize) {
1410 filters.push(
1411 time_index_expr
1412 .clone()
1413 .gt_eq(DfExpr::Literal(
1414 ScalarValue::TimestampMillisecond(
1415 Some(timestamp + offset_duration - lookback_delta - range),
1416 None,
1417 ),
1418 None,
1419 ))
1420 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1421 ScalarValue::TimestampMillisecond(
1422 Some(timestamp + offset_duration + lookback_delta),
1423 None,
1424 ),
1425 None,
1426 ))),
1427 )
1428 }
1429
1430 Ok(filters.into_iter().reduce(DfExpr::or))
1431 }
1432
1433 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1438 let provider = self
1439 .table_provider
1440 .resolve_table(table_ref.clone())
1441 .await
1442 .context(CatalogSnafu)?;
1443
1444 let is_time_index_ms = provider
1445 .as_any()
1446 .downcast_ref::<DefaultTableSource>()
1447 .context(UnknownTableSnafu)?
1448 .table_provider
1449 .as_any()
1450 .downcast_ref::<DfTableProviderAdapter>()
1451 .context(UnknownTableSnafu)?
1452 .table()
1453 .schema()
1454 .timestamp_column()
1455 .with_context(|| TimeIndexNotFoundSnafu {
1456 table: table_ref.to_quoted_string(),
1457 })?
1458 .data_type
1459 == ConcreteDataType::timestamp_millisecond_datatype();
1460
1461 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1462 .context(DataFusionPlanningSnafu)?
1463 .build()
1464 .context(DataFusionPlanningSnafu)?;
1465
1466 if !is_time_index_ms {
1467 let expr: Vec<_> = self
1469 .ctx
1470 .field_columns
1471 .iter()
1472 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1473 .chain(self.create_tag_column_exprs()?)
1474 .chain(Some(DfExpr::Alias(Alias {
1475 expr: Box::new(DfExpr::Cast(Cast {
1476 expr: Box::new(self.create_time_index_column_expr()?),
1477 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1478 })),
1479 relation: Some(table_ref.clone()),
1480 name: self
1481 .ctx
1482 .time_index_column
1483 .as_ref()
1484 .with_context(|| TimeIndexNotFoundSnafu {
1485 table: table_ref.to_quoted_string(),
1486 })?
1487 .clone(),
1488 metadata: None,
1489 })))
1490 .collect::<Vec<_>>();
1491 scan_plan = LogicalPlanBuilder::from(scan_plan)
1492 .project(expr)
1493 .context(DataFusionPlanningSnafu)?
1494 .build()
1495 .context(DataFusionPlanningSnafu)?;
1496 }
1497
1498 let result = LogicalPlanBuilder::from(scan_plan)
1499 .build()
1500 .context(DataFusionPlanningSnafu)?;
1501 Ok(result)
1502 }
1503
1504 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1508 let table_ref = self.table_ref()?;
1509 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1510 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1511 let plan = self.setup_context_for_empty_metric()?;
1512 return Ok(Some(plan));
1513 }
1514 res => res.context(CatalogSnafu)?,
1515 };
1516 let table = table
1517 .as_any()
1518 .downcast_ref::<DefaultTableSource>()
1519 .context(UnknownTableSnafu)?
1520 .table_provider
1521 .as_any()
1522 .downcast_ref::<DfTableProviderAdapter>()
1523 .context(UnknownTableSnafu)?
1524 .table();
1525
1526 let time_index = table
1528 .schema()
1529 .timestamp_column()
1530 .with_context(|| TimeIndexNotFoundSnafu {
1531 table: table_ref.to_quoted_string(),
1532 })?
1533 .name
1534 .clone();
1535 self.ctx.time_index_column = Some(time_index);
1536
1537 let values = table
1539 .table_info()
1540 .meta
1541 .field_column_names()
1542 .cloned()
1543 .collect();
1544 self.ctx.field_columns = values;
1545
1546 let tags = table
1548 .table_info()
1549 .meta
1550 .row_key_column_names()
1551 .filter(|col| {
1552 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1554 })
1555 .cloned()
1556 .collect();
1557 self.ctx.tag_columns = tags;
1558
1559 Ok(None)
1560 }
1561
1562 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1565 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1566 self.ctx.reset_table_name_and_schema();
1567 self.ctx.tag_columns = vec![];
1568 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1569
1570 let plan = LogicalPlan::Extension(Extension {
1572 node: Arc::new(
1573 EmptyMetric::new(
1574 0,
1575 -1,
1576 self.ctx.interval,
1577 SPECIAL_TIME_FUNCTION.to_string(),
1578 DEFAULT_FIELD_COLUMN.to_string(),
1579 Some(lit(0.0f64)),
1580 )
1581 .context(DataFusionPlanningSnafu)?,
1582 ),
1583 });
1584 Ok(plan)
1585 }
1586
1587 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1589 let mut result = FunctionArgs::default();
1590
1591 for arg in args {
1592 if let Some(expr) = Self::try_build_literal_expr(arg) {
1594 result.literals.push(expr);
1595 } else {
1596 match arg.as_ref() {
1598 PromExpr::Subquery(_)
1599 | PromExpr::VectorSelector(_)
1600 | PromExpr::MatrixSelector(_)
1601 | PromExpr::Extension(_)
1602 | PromExpr::Aggregate(_)
1603 | PromExpr::Paren(_)
1604 | PromExpr::Call(_)
1605 | PromExpr::Binary(_)
1606 | PromExpr::Unary(_) => {
1607 if result.input.replace(*arg.clone()).is_some() {
1608 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1609 }
1610 }
1611
1612 _ => {
1613 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1614 result.literals.push(expr);
1615 }
1616 }
1617 }
1618 }
1619
1620 Ok(result)
1621 }
1622
1623 fn create_function_expr(
1629 &mut self,
1630 func: &Function,
1631 other_input_exprs: Vec<DfExpr>,
1632 query_engine_state: &QueryEngineState,
1633 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1634 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1636
1637 let field_column_pos = 0;
1639 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1640 let mut new_tags = vec![];
1642 let scalar_func = match func.name {
1643 "increase" => ScalarFunc::ExtrapolateUdf(
1644 Arc::new(Increase::scalar_udf()),
1645 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1646 ),
1647 "rate" => ScalarFunc::ExtrapolateUdf(
1648 Arc::new(Rate::scalar_udf()),
1649 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1650 ),
1651 "delta" => ScalarFunc::ExtrapolateUdf(
1652 Arc::new(Delta::scalar_udf()),
1653 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1654 ),
1655 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1656 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1657 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1658 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1659 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1660 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1661 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1662 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1663 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1664 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1665 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1666 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1667 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1668 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1669 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1670 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1671 "predict_linear" => {
1672 other_input_exprs[0] = DfExpr::Cast(Cast {
1673 expr: Box::new(other_input_exprs[0].clone()),
1674 data_type: ArrowDataType::Int64,
1675 });
1676 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1677 }
1678 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1679 "time" => {
1680 exprs.push(build_special_time_expr(
1681 self.ctx.time_index_column.as_ref().unwrap(),
1682 ));
1683 ScalarFunc::GeneratedExpr
1684 }
1685 "minute" => {
1686 let expr = self.date_part_on_time_index("minute")?;
1688 exprs.push(expr);
1689 ScalarFunc::GeneratedExpr
1690 }
1691 "hour" => {
1692 let expr = self.date_part_on_time_index("hour")?;
1694 exprs.push(expr);
1695 ScalarFunc::GeneratedExpr
1696 }
1697 "month" => {
1698 let expr = self.date_part_on_time_index("month")?;
1700 exprs.push(expr);
1701 ScalarFunc::GeneratedExpr
1702 }
1703 "year" => {
1704 let expr = self.date_part_on_time_index("year")?;
1706 exprs.push(expr);
1707 ScalarFunc::GeneratedExpr
1708 }
1709 "day_of_month" => {
1710 let expr = self.date_part_on_time_index("day")?;
1712 exprs.push(expr);
1713 ScalarFunc::GeneratedExpr
1714 }
1715 "day_of_week" => {
1716 let expr = self.date_part_on_time_index("dow")?;
1718 exprs.push(expr);
1719 ScalarFunc::GeneratedExpr
1720 }
1721 "day_of_year" => {
1722 let expr = self.date_part_on_time_index("doy")?;
1724 exprs.push(expr);
1725 ScalarFunc::GeneratedExpr
1726 }
1727 "days_in_month" => {
1728 let day_lit_expr = "day".lit();
1733 let month_lit_expr = "month".lit();
1734 let interval_1month_lit_expr =
1735 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1736 let interval_1day_lit_expr = DfExpr::Literal(
1737 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1738 None,
1739 );
1740 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1741 left: Box::new(interval_1month_lit_expr),
1742 op: Operator::Minus,
1743 right: Box::new(interval_1day_lit_expr),
1744 });
1745 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1746 func: datafusion_functions::datetime::date_trunc(),
1747 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1748 });
1749 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1750 left: Box::new(date_trunc_expr),
1751 op: Operator::Plus,
1752 right: Box::new(the_1month_minus_1day_expr),
1753 });
1754 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1755 func: datafusion_functions::datetime::date_part(),
1756 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1757 });
1758
1759 exprs.push(date_part_expr);
1760 ScalarFunc::GeneratedExpr
1761 }
1762
1763 "label_join" => {
1764 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1765 &mut other_input_exprs,
1766 &self.ctx,
1767 query_engine_state,
1768 )?;
1769
1770 for value in &self.ctx.field_columns {
1772 if *value != dst_label {
1773 let expr = DfExpr::Column(Column::from_name(value));
1774 exprs.push(expr);
1775 }
1776 }
1777
1778 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1780 new_tags.push(dst_label);
1781 exprs.push(concat_expr);
1783
1784 ScalarFunc::GeneratedExpr
1785 }
1786 "label_replace" => {
1787 if let Some((replace_expr, dst_label)) = self
1788 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1789 {
1790 for value in &self.ctx.field_columns {
1792 if *value != dst_label {
1793 let expr = DfExpr::Column(Column::from_name(value));
1794 exprs.push(expr);
1795 }
1796 }
1797
1798 ensure!(
1799 !self.ctx.tag_columns.contains(&dst_label),
1800 SameLabelSetSnafu
1801 );
1802 new_tags.push(dst_label);
1803 exprs.push(replace_expr);
1805 } else {
1806 for value in &self.ctx.field_columns {
1808 let expr = DfExpr::Column(Column::from_name(value));
1809 exprs.push(expr);
1810 }
1811 }
1812
1813 ScalarFunc::GeneratedExpr
1814 }
1815 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1816 for value in &self.ctx.field_columns {
1819 let expr = DfExpr::Column(Column::from_name(value));
1820 exprs.push(expr);
1821 }
1822
1823 ScalarFunc::GeneratedExpr
1824 }
1825 "round" => {
1826 if other_input_exprs.is_empty() {
1827 other_input_exprs.push_front(0.0f64.lit());
1828 }
1829 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1830 }
1831 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1832 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1833 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1834 "pi" => {
1835 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1837 func: datafusion::functions::math::pi(),
1838 args: vec![],
1839 });
1840 exprs.push(fn_expr);
1841
1842 ScalarFunc::GeneratedExpr
1843 }
1844 _ => {
1845 if let Some(f) = query_engine_state
1846 .session_state()
1847 .scalar_functions()
1848 .get(func.name)
1849 {
1850 ScalarFunc::DataFusionBuiltin(f.clone())
1851 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1852 let func_state = query_engine_state.function_state();
1853 let query_ctx = self.table_provider.query_ctx();
1854
1855 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1856 state: func_state,
1857 query_ctx: query_ctx.clone(),
1858 })))
1859 } else if let Some(f) = datafusion_functions::math::functions()
1860 .iter()
1861 .find(|f| f.name() == func.name)
1862 {
1863 ScalarFunc::DataFusionUdf(f.clone())
1864 } else {
1865 return UnsupportedExprSnafu {
1866 name: func.name.to_string(),
1867 }
1868 .fail();
1869 }
1870 }
1871 };
1872
1873 for value in &self.ctx.field_columns {
1874 let col_expr = DfExpr::Column(Column::from_name(value));
1875
1876 match scalar_func.clone() {
1877 ScalarFunc::DataFusionBuiltin(func) => {
1878 other_input_exprs.insert(field_column_pos, col_expr);
1879 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1880 func,
1881 args: other_input_exprs.clone().into(),
1882 });
1883 exprs.push(fn_expr);
1884 let _ = other_input_exprs.remove(field_column_pos);
1885 }
1886 ScalarFunc::DataFusionUdf(func) => {
1887 let args = itertools::chain!(
1888 other_input_exprs.iter().take(field_column_pos).cloned(),
1889 std::iter::once(col_expr),
1890 other_input_exprs.iter().skip(field_column_pos).cloned()
1891 )
1892 .collect_vec();
1893 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1894 }
1895 ScalarFunc::Udf(func) => {
1896 let ts_range_expr = DfExpr::Column(Column::from_name(
1897 RangeManipulate::build_timestamp_range_name(
1898 self.ctx.time_index_column.as_ref().unwrap(),
1899 ),
1900 ));
1901 other_input_exprs.insert(field_column_pos, ts_range_expr);
1902 other_input_exprs.insert(field_column_pos + 1, col_expr);
1903 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1904 func,
1905 args: other_input_exprs.clone().into(),
1906 });
1907 exprs.push(fn_expr);
1908 let _ = other_input_exprs.remove(field_column_pos + 1);
1909 let _ = other_input_exprs.remove(field_column_pos);
1910 }
1911 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1912 let ts_range_expr = DfExpr::Column(Column::from_name(
1913 RangeManipulate::build_timestamp_range_name(
1914 self.ctx.time_index_column.as_ref().unwrap(),
1915 ),
1916 ));
1917 other_input_exprs.insert(field_column_pos, ts_range_expr);
1918 other_input_exprs.insert(field_column_pos + 1, col_expr);
1919 other_input_exprs
1920 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1921 other_input_exprs.push_back(lit(range_length));
1922 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1923 func,
1924 args: other_input_exprs.clone().into(),
1925 });
1926 exprs.push(fn_expr);
1927 let _ = other_input_exprs.pop_back();
1928 let _ = other_input_exprs.remove(field_column_pos + 2);
1929 let _ = other_input_exprs.remove(field_column_pos + 1);
1930 let _ = other_input_exprs.remove(field_column_pos);
1931 }
1932 ScalarFunc::GeneratedExpr => {}
1933 }
1934 }
1935
1936 if !matches!(func.name, "label_join" | "label_replace") {
1940 let mut new_field_columns = Vec::with_capacity(exprs.len());
1941
1942 exprs = exprs
1943 .into_iter()
1944 .map(|expr| {
1945 let display_name = expr.schema_name().to_string();
1946 new_field_columns.push(display_name.clone());
1947 Ok(expr.alias(display_name))
1948 })
1949 .collect::<std::result::Result<Vec<_>, _>>()
1950 .context(DataFusionPlanningSnafu)?;
1951
1952 self.ctx.field_columns = new_field_columns;
1953 }
1954
1955 Ok((exprs, new_tags))
1956 }
1957
1958 fn validate_label_name(label_name: &str) -> Result<()> {
1962 if label_name.starts_with("__") {
1964 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1965 }
1966 if !LABEL_NAME_REGEX.is_match(label_name) {
1968 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1969 }
1970
1971 Ok(())
1972 }
1973
1974 fn build_regexp_replace_label_expr(
1976 &self,
1977 other_input_exprs: &mut VecDeque<DfExpr>,
1978 query_engine_state: &QueryEngineState,
1979 ) -> Result<Option<(DfExpr, String)>> {
1980 let dst_label = match other_input_exprs.pop_front() {
1982 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1983 other => UnexpectedPlanExprSnafu {
1984 desc: format!("expected dst_label string literal, but found {:?}", other),
1985 }
1986 .fail()?,
1987 };
1988
1989 Self::validate_label_name(&dst_label)?;
1991 let replacement = match other_input_exprs.pop_front() {
1992 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
1993 other => UnexpectedPlanExprSnafu {
1994 desc: format!("expected replacement string literal, but found {:?}", other),
1995 }
1996 .fail()?,
1997 };
1998 let src_label = match other_input_exprs.pop_front() {
1999 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2000 other => UnexpectedPlanExprSnafu {
2001 desc: format!("expected src_label string literal, but found {:?}", other),
2002 }
2003 .fail()?,
2004 };
2005
2006 let regex = match other_input_exprs.pop_front() {
2007 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2008 other => UnexpectedPlanExprSnafu {
2009 desc: format!("expected regex string literal, but found {:?}", other),
2010 }
2011 .fail()?,
2012 };
2013
2014 regex::Regex::new(®ex).map_err(|_| {
2017 InvalidRegularExpressionSnafu {
2018 regex: regex.clone(),
2019 }
2020 .build()
2021 })?;
2022
2023 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2025 return Ok(None);
2026 }
2027
2028 if !self.ctx.tag_columns.contains(&src_label) {
2030 if replacement.is_empty() {
2031 return Ok(None);
2033 } else {
2034 return Ok(Some((
2036 lit(replacement).alias(&dst_label),
2038 dst_label,
2039 )));
2040 }
2041 }
2042
2043 let regex = format!("^(?s:{regex})$");
2046
2047 let session_state = query_engine_state.session_state();
2048 let func = session_state
2049 .scalar_functions()
2050 .get("regexp_replace")
2051 .context(UnsupportedExprSnafu {
2052 name: "regexp_replace",
2053 })?;
2054
2055 let args = vec![
2057 if src_label.is_empty() {
2058 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2059 } else {
2060 DfExpr::Column(Column::from_name(src_label))
2061 },
2062 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2063 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2064 ];
2065
2066 Ok(Some((
2067 DfExpr::ScalarFunction(ScalarFunction {
2068 func: func.clone(),
2069 args,
2070 })
2071 .alias(&dst_label),
2072 dst_label,
2073 )))
2074 }
2075
2076 fn build_concat_labels_expr(
2078 other_input_exprs: &mut VecDeque<DfExpr>,
2079 ctx: &PromPlannerContext,
2080 query_engine_state: &QueryEngineState,
2081 ) -> Result<(DfExpr, String)> {
2082 let dst_label = match other_input_exprs.pop_front() {
2085 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2086 other => UnexpectedPlanExprSnafu {
2087 desc: format!("expected dst_label string literal, but found {:?}", other),
2088 }
2089 .fail()?,
2090 };
2091 let separator = match other_input_exprs.pop_front() {
2092 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2093 other => UnexpectedPlanExprSnafu {
2094 desc: format!("expected separator string literal, but found {:?}", other),
2095 }
2096 .fail()?,
2097 };
2098
2099 let available_columns: HashSet<&str> = ctx
2101 .tag_columns
2102 .iter()
2103 .chain(ctx.field_columns.iter())
2104 .chain(ctx.time_index_column.as_ref())
2105 .map(|s| s.as_str())
2106 .collect();
2107
2108 let src_labels = other_input_exprs
2109 .iter()
2110 .map(|expr| {
2111 match expr {
2113 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2114 if label.is_empty() {
2115 Ok(DfExpr::Literal(ScalarValue::Null, None))
2116 } else if available_columns.contains(label.as_str()) {
2117 Ok(DfExpr::Column(Column::from_name(label)))
2119 } else {
2120 Ok(DfExpr::Literal(ScalarValue::Null, None))
2122 }
2123 }
2124 other => UnexpectedPlanExprSnafu {
2125 desc: format!(
2126 "expected source label string literal, but found {:?}",
2127 other
2128 ),
2129 }
2130 .fail(),
2131 }
2132 })
2133 .collect::<Result<Vec<_>>>()?;
2134 ensure!(
2135 !src_labels.is_empty(),
2136 FunctionInvalidArgumentSnafu {
2137 fn_name: "label_join"
2138 }
2139 );
2140
2141 let session_state = query_engine_state.session_state();
2142 let func = session_state
2143 .scalar_functions()
2144 .get("concat_ws")
2145 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2146
2147 let mut args = Vec::with_capacity(1 + src_labels.len());
2149 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2150 args.extend(src_labels);
2151
2152 Ok((
2153 DfExpr::ScalarFunction(ScalarFunction {
2154 func: func.clone(),
2155 args,
2156 })
2157 .alias(&dst_label),
2158 dst_label,
2159 ))
2160 }
2161
2162 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2163 Ok(DfExpr::Column(Column::from_name(
2164 self.ctx
2165 .time_index_column
2166 .clone()
2167 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2168 )))
2169 }
2170
2171 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2172 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2173 for tag in &self.ctx.tag_columns {
2174 let expr = DfExpr::Column(Column::from_name(tag));
2175 result.push(expr);
2176 }
2177 Ok(result)
2178 }
2179
2180 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2181 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2182 for field in &self.ctx.field_columns {
2183 let expr = DfExpr::Column(Column::from_name(field));
2184 result.push(expr);
2185 }
2186 Ok(result)
2187 }
2188
2189 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2190 let mut result = self
2191 .ctx
2192 .tag_columns
2193 .iter()
2194 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2195 .collect::<Vec<_>>();
2196 result.push(self.create_time_index_column_expr()?.sort(true, true));
2197 Ok(result)
2198 }
2199
2200 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2201 self.ctx
2202 .field_columns
2203 .iter()
2204 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2205 .collect::<Vec<_>>()
2206 }
2207
2208 fn create_sort_exprs_by_tags(
2209 func: &str,
2210 tags: Vec<DfExpr>,
2211 asc: bool,
2212 ) -> Result<Vec<SortExpr>> {
2213 ensure!(
2214 !tags.is_empty(),
2215 FunctionInvalidArgumentSnafu { fn_name: func }
2216 );
2217
2218 tags.iter()
2219 .map(|col| match col {
2220 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2221 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2222 }
2223 other => UnexpectedPlanExprSnafu {
2224 desc: format!("expected label string literal, but found {:?}", other),
2225 }
2226 .fail(),
2227 })
2228 .collect::<Result<Vec<_>>>()
2229 }
2230
2231 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2232 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2233 for value in &self.ctx.field_columns {
2234 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2235 exprs.push(expr);
2236 }
2237
2238 conjunction(exprs).context(ValueNotFoundSnafu {
2239 table: self.table_ref()?.to_quoted_string(),
2240 })
2241 }
2242
2243 fn create_aggregate_exprs(
2259 &mut self,
2260 op: TokenType,
2261 param: &Option<Box<PromExpr>>,
2262 input_plan: &LogicalPlan,
2263 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2264 let mut non_col_args = Vec::new();
2265 let aggr = match op.id() {
2266 token::T_SUM => sum_udaf(),
2267 token::T_QUANTILE => {
2268 let q =
2269 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2270 non_col_args.push(q);
2271 quantile_udaf()
2272 }
2273 token::T_AVG => avg_udaf(),
2274 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2275 token::T_MIN => min_udaf(),
2276 token::T_MAX => max_udaf(),
2277 token::T_GROUP => grouping_udaf(),
2278 token::T_STDDEV => stddev_pop_udaf(),
2279 token::T_STDVAR => var_pop_udaf(),
2280 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2281 name: format!("{op:?}"),
2282 }
2283 .fail()?,
2284 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2285 };
2286
2287 let exprs: Vec<DfExpr> = self
2289 .ctx
2290 .field_columns
2291 .iter()
2292 .map(|col| {
2293 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2294 let expr = aggr.call(non_col_args.clone());
2295 non_col_args.pop();
2296 expr
2297 })
2298 .collect::<Vec<_>>();
2299
2300 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2302 let prev_field_exprs: Vec<_> = self
2303 .ctx
2304 .field_columns
2305 .iter()
2306 .map(|col| DfExpr::Column(Column::from_name(col)))
2307 .collect();
2308
2309 ensure!(
2310 self.ctx.field_columns.len() == 1,
2311 UnsupportedExprSnafu {
2312 name: "count_values on multi-value input"
2313 }
2314 );
2315
2316 prev_field_exprs
2317 } else {
2318 vec![]
2319 };
2320
2321 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2323
2324 let normalized_exprs =
2325 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2326 for expr in normalized_exprs {
2327 new_field_columns.push(expr.schema_name().to_string());
2328 }
2329 self.ctx.field_columns = new_field_columns;
2330
2331 Ok((exprs, prev_field_exprs))
2332 }
2333
2334 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2335 let param = param
2336 .as_deref()
2337 .with_context(|| FunctionInvalidArgumentSnafu {
2338 fn_name: op.to_string(),
2339 })?;
2340 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2341 return FunctionInvalidArgumentSnafu {
2342 fn_name: op.to_string(),
2343 }
2344 .fail();
2345 };
2346
2347 Ok(val)
2348 }
2349
2350 fn get_param_as_literal_expr(
2351 param: &Option<Box<PromExpr>>,
2352 op: Option<TokenType>,
2353 expected_type: Option<ArrowDataType>,
2354 ) -> Result<DfExpr> {
2355 let prom_param = param.as_deref().with_context(|| {
2356 if let Some(op) = op {
2357 FunctionInvalidArgumentSnafu {
2358 fn_name: op.to_string(),
2359 }
2360 } else {
2361 FunctionInvalidArgumentSnafu {
2362 fn_name: "unknown".to_string(),
2363 }
2364 }
2365 })?;
2366
2367 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2368 if let Some(op) = op {
2369 FunctionInvalidArgumentSnafu {
2370 fn_name: op.to_string(),
2371 }
2372 } else {
2373 FunctionInvalidArgumentSnafu {
2374 fn_name: "unknown".to_string(),
2375 }
2376 }
2377 })?;
2378
2379 if let Some(expected_type) = expected_type {
2381 let expr_type = expr
2383 .get_type(&DFSchema::empty())
2384 .context(DataFusionPlanningSnafu)?;
2385 if expected_type != expr_type {
2386 return FunctionInvalidArgumentSnafu {
2387 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2388 }
2389 .fail();
2390 }
2391 }
2392
2393 Ok(expr)
2394 }
2395
2396 fn create_window_exprs(
2399 &mut self,
2400 op: TokenType,
2401 group_exprs: Vec<DfExpr>,
2402 input_plan: &LogicalPlan,
2403 ) -> Result<Vec<DfExpr>> {
2404 ensure!(
2405 self.ctx.field_columns.len() == 1,
2406 UnsupportedExprSnafu {
2407 name: "topk or bottomk on multi-value input"
2408 }
2409 );
2410
2411 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2412
2413 let asc = matches!(op.id(), token::T_BOTTOMK);
2414
2415 let tag_sort_exprs = self
2416 .create_tag_column_exprs()?
2417 .into_iter()
2418 .map(|expr| expr.sort(asc, true));
2419
2420 let exprs: Vec<DfExpr> = self
2422 .ctx
2423 .field_columns
2424 .iter()
2425 .map(|col| {
2426 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2427 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2429 sort_exprs.extend(tag_sort_exprs.clone());
2432
2433 DfExpr::WindowFunction(Box::new(WindowFunction {
2434 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2435 params: WindowFunctionParams {
2436 args: vec![],
2437 partition_by: group_exprs.clone(),
2438 order_by: sort_exprs,
2439 window_frame: WindowFrame::new(Some(true)),
2440 null_treatment: None,
2441 distinct: false,
2442 },
2443 }))
2444 })
2445 .collect();
2446
2447 let normalized_exprs =
2448 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2449 Ok(normalized_exprs)
2450 }
2451
2452 #[deprecated(
2454 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2455 )]
2456 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2457 match expr {
2458 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2459 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2460 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2461 Self::try_build_float_literal(expr).map(|f| -f)
2462 }
2463 PromExpr::StringLiteral(_)
2464 | PromExpr::Binary(_)
2465 | PromExpr::VectorSelector(_)
2466 | PromExpr::MatrixSelector(_)
2467 | PromExpr::Call(_)
2468 | PromExpr::Extension(_)
2469 | PromExpr::Aggregate(_)
2470 | PromExpr::Subquery(_) => None,
2471 }
2472 }
2473
2474 async fn create_histogram_plan(
2476 &mut self,
2477 args: &PromFunctionArgs,
2478 query_engine_state: &QueryEngineState,
2479 ) -> Result<LogicalPlan> {
2480 if args.args.len() != 2 {
2481 return FunctionInvalidArgumentSnafu {
2482 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2483 }
2484 .fail();
2485 }
2486 #[allow(deprecated)]
2487 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2488 FunctionInvalidArgumentSnafu {
2489 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2490 }
2491 })?;
2492
2493 let input = args.args[1].as_ref().clone();
2494 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2495
2496 if !self.ctx.has_le_tag() {
2497 return Ok(LogicalPlan::EmptyRelation(
2500 datafusion::logical_expr::EmptyRelation {
2501 produce_one_row: false,
2502 schema: Arc::new(DFSchema::empty()),
2503 },
2504 ));
2505 }
2506 let time_index_column =
2507 self.ctx
2508 .time_index_column
2509 .clone()
2510 .with_context(|| TimeIndexNotFoundSnafu {
2511 table: self.ctx.table_name.clone().unwrap_or_default(),
2512 })?;
2513 let field_column = self
2515 .ctx
2516 .field_columns
2517 .first()
2518 .with_context(|| FunctionInvalidArgumentSnafu {
2519 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2520 })?
2521 .clone();
2522 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2524
2525 Ok(LogicalPlan::Extension(Extension {
2526 node: Arc::new(
2527 HistogramFold::new(
2528 LE_COLUMN_NAME.to_string(),
2529 field_column,
2530 time_index_column,
2531 phi,
2532 input_plan,
2533 )
2534 .context(DataFusionPlanningSnafu)?,
2535 ),
2536 }))
2537 }
2538
2539 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2541 if args.args.len() != 1 {
2542 return FunctionInvalidArgumentSnafu {
2543 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2544 }
2545 .fail();
2546 }
2547 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2548
2549 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2551 self.ctx.reset_table_name_and_schema();
2552 self.ctx.tag_columns = vec![];
2553 self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2554 Ok(LogicalPlan::Extension(Extension {
2555 node: Arc::new(
2556 EmptyMetric::new(
2557 self.ctx.start,
2558 self.ctx.end,
2559 self.ctx.interval,
2560 SPECIAL_TIME_FUNCTION.to_string(),
2561 GREPTIME_VALUE.to_string(),
2562 Some(lit),
2563 )
2564 .context(DataFusionPlanningSnafu)?,
2565 ),
2566 }))
2567 }
2568
2569 async fn create_scalar_plan(
2571 &mut self,
2572 args: &PromFunctionArgs,
2573 query_engine_state: &QueryEngineState,
2574 ) -> Result<LogicalPlan> {
2575 ensure!(
2576 args.len() == 1,
2577 FunctionInvalidArgumentSnafu {
2578 fn_name: SCALAR_FUNCTION
2579 }
2580 );
2581 let input = self
2582 .prom_expr_to_plan(&args.args[0], query_engine_state)
2583 .await?;
2584 ensure!(
2585 self.ctx.field_columns.len() == 1,
2586 MultiFieldsNotSupportedSnafu {
2587 operator: SCALAR_FUNCTION
2588 },
2589 );
2590 let scalar_plan = LogicalPlan::Extension(Extension {
2591 node: Arc::new(
2592 ScalarCalculate::new(
2593 self.ctx.start,
2594 self.ctx.end,
2595 self.ctx.interval,
2596 input,
2597 self.ctx.time_index_column.as_ref().unwrap(),
2598 &self.ctx.tag_columns,
2599 &self.ctx.field_columns[0],
2600 self.ctx.table_name.as_deref(),
2601 )
2602 .context(PromqlPlanNodeSnafu)?,
2603 ),
2604 });
2605 self.ctx.tag_columns.clear();
2607 self.ctx.field_columns.clear();
2608 self.ctx
2609 .field_columns
2610 .push(scalar_plan.schema().field(1).name().clone());
2611 Ok(scalar_plan)
2612 }
2613
2614 async fn create_absent_plan(
2616 &mut self,
2617 args: &PromFunctionArgs,
2618 query_engine_state: &QueryEngineState,
2619 ) -> Result<LogicalPlan> {
2620 if args.args.len() != 1 {
2621 return FunctionInvalidArgumentSnafu {
2622 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2623 }
2624 .fail();
2625 }
2626 let input = self
2627 .prom_expr_to_plan(&args.args[0], query_engine_state)
2628 .await?;
2629
2630 let time_index_expr = self.create_time_index_column_expr()?;
2631 let first_field_expr =
2632 self.create_field_column_exprs()?
2633 .pop()
2634 .with_context(|| ValueNotFoundSnafu {
2635 table: self.ctx.table_name.clone().unwrap_or_default(),
2636 })?;
2637 let first_value_expr = first_value(first_field_expr, vec![]);
2638
2639 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2640 .aggregate(
2641 vec![time_index_expr.clone()],
2642 vec![first_value_expr.clone()],
2643 )
2644 .context(DataFusionPlanningSnafu)?
2645 .sort(vec![time_index_expr.sort(true, false)])
2646 .context(DataFusionPlanningSnafu)?
2647 .build()
2648 .context(DataFusionPlanningSnafu)?;
2649
2650 let fake_labels = self
2651 .ctx
2652 .selector_matcher
2653 .iter()
2654 .filter_map(|matcher| match matcher.op {
2655 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2656 _ => None,
2657 })
2658 .collect::<Vec<_>>();
2659
2660 let absent_plan = LogicalPlan::Extension(Extension {
2662 node: Arc::new(
2663 Absent::try_new(
2664 self.ctx.start,
2665 self.ctx.end,
2666 self.ctx.interval,
2667 self.ctx.time_index_column.as_ref().unwrap().clone(),
2668 self.ctx.field_columns[0].clone(),
2669 fake_labels,
2670 ordered_aggregated_input,
2671 )
2672 .context(DataFusionPlanningSnafu)?,
2673 ),
2674 });
2675
2676 Ok(absent_plan)
2677 }
2678
2679 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2682 match expr {
2683 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2684 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2685 PromExpr::VectorSelector(_)
2686 | PromExpr::MatrixSelector(_)
2687 | PromExpr::Extension(_)
2688 | PromExpr::Aggregate(_)
2689 | PromExpr::Subquery(_) => None,
2690 PromExpr::Call(Call { func, .. }) => {
2691 if func.name == SPECIAL_TIME_FUNCTION {
2692 None
2695 } else {
2696 None
2697 }
2698 }
2699 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2700 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2702 PromExpr::Binary(PromBinaryExpr {
2703 lhs,
2704 rhs,
2705 op,
2706 modifier,
2707 }) => {
2708 let lhs = Self::try_build_literal_expr(lhs)?;
2709 let rhs = Self::try_build_literal_expr(rhs)?;
2710 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2711 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2712 let expr = expr_builder(lhs, rhs).ok()?;
2713
2714 let should_return_bool = if let Some(m) = modifier {
2715 m.return_bool
2716 } else {
2717 false
2718 };
2719 if is_comparison_op && should_return_bool {
2720 Some(DfExpr::Cast(Cast {
2721 expr: Box::new(expr),
2722 data_type: ArrowDataType::Float64,
2723 }))
2724 } else {
2725 Some(expr)
2726 }
2727 }
2728 }
2729 }
2730
2731 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2732 match expr {
2733 PromExpr::Call(Call { func, .. }) => {
2734 if func.name == SPECIAL_TIME_FUNCTION
2735 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2736 {
2737 Some(build_special_time_expr(time_index_col))
2738 } else {
2739 None
2740 }
2741 }
2742 _ => None,
2743 }
2744 }
2745
2746 #[allow(clippy::type_complexity)]
2749 fn prom_token_to_binary_expr_builder(
2750 token: TokenType,
2751 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2752 match token.id() {
2753 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2754 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2755 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2756 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2757 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2758 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2759 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2760 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2761 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2762 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2763 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2764 token::T_POW => Ok(Box::new(|lhs, rhs| {
2765 Ok(DfExpr::ScalarFunction(ScalarFunction {
2766 func: datafusion_functions::math::power(),
2767 args: vec![lhs, rhs],
2768 }))
2769 })),
2770 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2771 Ok(DfExpr::ScalarFunction(ScalarFunction {
2772 func: datafusion_functions::math::atan2(),
2773 args: vec![lhs, rhs],
2774 }))
2775 })),
2776 _ => UnexpectedTokenSnafu { token }.fail(),
2777 }
2778 }
2779
2780 fn is_token_a_comparison_op(token: TokenType) -> bool {
2782 matches!(
2783 token.id(),
2784 token::T_EQLC
2785 | token::T_NEQ
2786 | token::T_GTR
2787 | token::T_LSS
2788 | token::T_GTE
2789 | token::T_LTE
2790 )
2791 }
2792
2793 fn is_token_a_set_op(token: TokenType) -> bool {
2795 matches!(
2796 token.id(),
2797 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2801 }
2802
2803 #[allow(clippy::too_many_arguments)]
2806 fn join_on_non_field_columns(
2807 &self,
2808 left: LogicalPlan,
2809 right: LogicalPlan,
2810 left_table_ref: TableReference,
2811 right_table_ref: TableReference,
2812 left_time_index_column: Option<String>,
2813 right_time_index_column: Option<String>,
2814 only_join_time_index: bool,
2815 modifier: &Option<BinModifier>,
2816 ) -> Result<LogicalPlan> {
2817 let mut left_tag_columns = if only_join_time_index {
2818 BTreeSet::new()
2819 } else {
2820 self.ctx
2821 .tag_columns
2822 .iter()
2823 .cloned()
2824 .collect::<BTreeSet<_>>()
2825 };
2826 let mut right_tag_columns = left_tag_columns.clone();
2827
2828 if let Some(modifier) = modifier {
2830 if let Some(matching) = &modifier.matching {
2832 match matching {
2833 LabelModifier::Include(on) => {
2835 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2836 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2837 right_tag_columns =
2838 right_tag_columns.intersection(&mask).cloned().collect();
2839 }
2840 LabelModifier::Exclude(ignoring) => {
2842 for label in &ignoring.labels {
2844 let _ = left_tag_columns.remove(label);
2845 let _ = right_tag_columns.remove(label);
2846 }
2847 }
2848 }
2849 }
2850 }
2851
2852 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2854 (left_time_index_column, right_time_index_column)
2855 {
2856 left_tag_columns.insert(left_time_index_column);
2857 right_tag_columns.insert(right_time_index_column);
2858 }
2859
2860 let right = LogicalPlanBuilder::from(right)
2861 .alias(right_table_ref)
2862 .context(DataFusionPlanningSnafu)?
2863 .build()
2864 .context(DataFusionPlanningSnafu)?;
2865
2866 LogicalPlanBuilder::from(left)
2868 .alias(left_table_ref)
2869 .context(DataFusionPlanningSnafu)?
2870 .join_detailed(
2871 right,
2872 JoinType::Inner,
2873 (
2874 left_tag_columns
2875 .into_iter()
2876 .map(Column::from_name)
2877 .collect::<Vec<_>>(),
2878 right_tag_columns
2879 .into_iter()
2880 .map(Column::from_name)
2881 .collect::<Vec<_>>(),
2882 ),
2883 None,
2884 NullEquality::NullEqualsNull,
2885 )
2886 .context(DataFusionPlanningSnafu)?
2887 .build()
2888 .context(DataFusionPlanningSnafu)
2889 }
2890
2891 fn set_op_on_non_field_columns(
2893 &mut self,
2894 left: LogicalPlan,
2895 mut right: LogicalPlan,
2896 left_context: PromPlannerContext,
2897 right_context: PromPlannerContext,
2898 op: TokenType,
2899 modifier: &Option<BinModifier>,
2900 ) -> Result<LogicalPlan> {
2901 let mut left_tag_col_set = left_context
2902 .tag_columns
2903 .iter()
2904 .cloned()
2905 .collect::<HashSet<_>>();
2906 let mut right_tag_col_set = right_context
2907 .tag_columns
2908 .iter()
2909 .cloned()
2910 .collect::<HashSet<_>>();
2911
2912 if matches!(op.id(), token::T_LOR) {
2913 return self.or_operator(
2914 left,
2915 right,
2916 left_tag_col_set,
2917 right_tag_col_set,
2918 left_context,
2919 right_context,
2920 modifier,
2921 );
2922 }
2923
2924 if let Some(modifier) = modifier {
2926 ensure!(
2928 matches!(
2929 modifier.card,
2930 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2931 ),
2932 UnsupportedVectorMatchSnafu {
2933 name: modifier.card.clone(),
2934 },
2935 );
2936 if let Some(matching) = &modifier.matching {
2938 match matching {
2939 LabelModifier::Include(on) => {
2941 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2942 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2943 right_tag_col_set =
2944 right_tag_col_set.intersection(&mask).cloned().collect();
2945 }
2946 LabelModifier::Exclude(ignoring) => {
2948 for label in &ignoring.labels {
2950 let _ = left_tag_col_set.remove(label);
2951 let _ = right_tag_col_set.remove(label);
2952 }
2953 }
2954 }
2955 }
2956 }
2957 if !matches!(op.id(), token::T_LOR) {
2959 ensure!(
2960 left_tag_col_set == right_tag_col_set,
2961 CombineTableColumnMismatchSnafu {
2962 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2963 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2964 }
2965 )
2966 };
2967 let left_time_index = left_context.time_index_column.clone().unwrap();
2968 let right_time_index = right_context.time_index_column.clone().unwrap();
2969 let join_keys = left_tag_col_set
2970 .iter()
2971 .cloned()
2972 .chain([left_time_index.clone()])
2973 .collect::<Vec<_>>();
2974 self.ctx.time_index_column = Some(left_time_index.clone());
2975
2976 if left_context.time_index_column != right_context.time_index_column {
2978 let right_project_exprs = right
2979 .schema()
2980 .fields()
2981 .iter()
2982 .map(|field| {
2983 if field.name() == &right_time_index {
2984 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2985 } else {
2986 DfExpr::Column(Column::from_name(field.name()))
2987 }
2988 })
2989 .collect::<Vec<_>>();
2990
2991 right = LogicalPlanBuilder::from(right)
2992 .project(right_project_exprs)
2993 .context(DataFusionPlanningSnafu)?
2994 .build()
2995 .context(DataFusionPlanningSnafu)?;
2996 }
2997
2998 ensure!(
2999 left_context.field_columns.len() == 1,
3000 MultiFieldsNotSupportedSnafu {
3001 operator: "AND operator"
3002 }
3003 );
3004 let left_field_col = left_context.field_columns.first().unwrap();
3007 self.ctx.field_columns = vec![left_field_col.clone()];
3008
3009 match op.id() {
3012 token::T_LAND => LogicalPlanBuilder::from(left)
3013 .distinct()
3014 .context(DataFusionPlanningSnafu)?
3015 .join_detailed(
3016 right,
3017 JoinType::LeftSemi,
3018 (join_keys.clone(), join_keys),
3019 None,
3020 NullEquality::NullEqualsNull,
3021 )
3022 .context(DataFusionPlanningSnafu)?
3023 .build()
3024 .context(DataFusionPlanningSnafu),
3025 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3026 .distinct()
3027 .context(DataFusionPlanningSnafu)?
3028 .join_detailed(
3029 right,
3030 JoinType::LeftAnti,
3031 (join_keys.clone(), join_keys),
3032 None,
3033 NullEquality::NullEqualsNull,
3034 )
3035 .context(DataFusionPlanningSnafu)?
3036 .build()
3037 .context(DataFusionPlanningSnafu),
3038 token::T_LOR => {
3039 unreachable!()
3042 }
3043 _ => UnexpectedTokenSnafu { token: op }.fail(),
3044 }
3045 }
3046
3047 #[allow(clippy::too_many_arguments)]
3049 fn or_operator(
3050 &mut self,
3051 left: LogicalPlan,
3052 right: LogicalPlan,
3053 left_tag_cols_set: HashSet<String>,
3054 right_tag_cols_set: HashSet<String>,
3055 left_context: PromPlannerContext,
3056 right_context: PromPlannerContext,
3057 modifier: &Option<BinModifier>,
3058 ) -> Result<LogicalPlan> {
3059 ensure!(
3061 left_context.field_columns.len() == right_context.field_columns.len(),
3062 CombineTableColumnMismatchSnafu {
3063 left: left_context.field_columns.clone(),
3064 right: right_context.field_columns.clone()
3065 }
3066 );
3067 ensure!(
3068 left_context.field_columns.len() == 1,
3069 MultiFieldsNotSupportedSnafu {
3070 operator: "OR operator"
3071 }
3072 );
3073
3074 let all_tags = left_tag_cols_set
3076 .union(&right_tag_cols_set)
3077 .cloned()
3078 .collect::<HashSet<_>>();
3079 let tags_not_in_left = all_tags
3080 .difference(&left_tag_cols_set)
3081 .cloned()
3082 .collect::<Vec<_>>();
3083 let tags_not_in_right = all_tags
3084 .difference(&right_tag_cols_set)
3085 .cloned()
3086 .collect::<Vec<_>>();
3087 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3088 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3089 let left_qualifier_string = left_qualifier
3090 .as_ref()
3091 .map(|l| l.to_string())
3092 .unwrap_or_default();
3093 let right_qualifier_string = right_qualifier
3094 .as_ref()
3095 .map(|r| r.to_string())
3096 .unwrap_or_default();
3097 let left_time_index_column =
3098 left_context
3099 .time_index_column
3100 .clone()
3101 .with_context(|| TimeIndexNotFoundSnafu {
3102 table: left_qualifier_string.clone(),
3103 })?;
3104 let right_time_index_column =
3105 right_context
3106 .time_index_column
3107 .clone()
3108 .with_context(|| TimeIndexNotFoundSnafu {
3109 table: right_qualifier_string.clone(),
3110 })?;
3111 let left_field_col = left_context.field_columns.first().unwrap();
3113 let right_field_col = right_context.field_columns.first().unwrap();
3114
3115 let mut all_columns_set = left
3117 .schema()
3118 .fields()
3119 .iter()
3120 .chain(right.schema().fields().iter())
3121 .map(|field| field.name().clone())
3122 .collect::<HashSet<_>>();
3123 all_columns_set.remove(&left_time_index_column);
3125 all_columns_set.remove(&right_time_index_column);
3126 if left_field_col != right_field_col {
3128 all_columns_set.remove(right_field_col);
3129 }
3130 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3131 all_columns.sort_unstable();
3133 all_columns.insert(0, left_time_index_column.clone());
3135
3136 let left_proj_exprs = all_columns.iter().map(|col| {
3138 if tags_not_in_left.contains(col) {
3139 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3140 } else {
3141 DfExpr::Column(Column::new(None::<String>, col))
3142 }
3143 });
3144 let right_time_index_expr = DfExpr::Column(Column::new(
3145 right_qualifier.clone(),
3146 right_time_index_column,
3147 ))
3148 .alias(left_time_index_column.clone());
3149 let right_qualifier_for_field = right
3152 .schema()
3153 .iter()
3154 .find(|(_, f)| f.name() == right_field_col)
3155 .map(|(q, _)| q)
3156 .context(ColumnNotFoundSnafu {
3157 col: right_field_col.to_string(),
3158 })?
3159 .cloned();
3160
3161 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3163 if col == left_field_col && left_field_col != right_field_col {
3165 DfExpr::Column(Column::new(
3167 right_qualifier_for_field.clone(),
3168 right_field_col,
3169 ))
3170 } else if tags_not_in_right.contains(col) {
3171 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3172 } else {
3173 DfExpr::Column(Column::new(None::<String>, col))
3174 }
3175 });
3176 let right_proj_exprs = [right_time_index_expr]
3177 .into_iter()
3178 .chain(right_proj_exprs_without_time_index);
3179
3180 let left_projected = LogicalPlanBuilder::from(left)
3181 .project(left_proj_exprs)
3182 .context(DataFusionPlanningSnafu)?
3183 .alias(left_qualifier_string.clone())
3184 .context(DataFusionPlanningSnafu)?
3185 .build()
3186 .context(DataFusionPlanningSnafu)?;
3187 let right_projected = LogicalPlanBuilder::from(right)
3188 .project(right_proj_exprs)
3189 .context(DataFusionPlanningSnafu)?
3190 .alias(right_qualifier_string.clone())
3191 .context(DataFusionPlanningSnafu)?
3192 .build()
3193 .context(DataFusionPlanningSnafu)?;
3194
3195 let mut match_columns = if let Some(modifier) = modifier
3197 && let Some(matching) = &modifier.matching
3198 {
3199 match matching {
3200 LabelModifier::Include(on) => on.labels.clone(),
3202 LabelModifier::Exclude(ignoring) => {
3204 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3205 all_tags.difference(&ignoring).cloned().collect()
3206 }
3207 }
3208 } else {
3209 all_tags.iter().cloned().collect()
3210 };
3211 match_columns.sort_unstable();
3213 let schema = left_projected.schema().clone();
3215 let union_distinct_on = UnionDistinctOn::new(
3216 left_projected,
3217 right_projected,
3218 match_columns,
3219 left_time_index_column.clone(),
3220 schema,
3221 );
3222 let result = LogicalPlan::Extension(Extension {
3223 node: Arc::new(union_distinct_on),
3224 });
3225
3226 self.ctx.time_index_column = Some(left_time_index_column);
3228 self.ctx.tag_columns = all_tags.into_iter().collect();
3229 self.ctx.field_columns = vec![left_field_col.to_string()];
3230
3231 Ok(result)
3232 }
3233
3234 fn projection_for_each_field_column<F>(
3242 &mut self,
3243 input: LogicalPlan,
3244 name_to_expr: F,
3245 ) -> Result<LogicalPlan>
3246 where
3247 F: FnMut(&String) -> Result<DfExpr>,
3248 {
3249 let non_field_columns_iter = self
3250 .ctx
3251 .tag_columns
3252 .iter()
3253 .chain(self.ctx.time_index_column.iter())
3254 .map(|col| {
3255 Ok(DfExpr::Column(Column::new(
3256 self.ctx.table_name.clone().map(TableReference::bare),
3257 col,
3258 )))
3259 });
3260
3261 let result_field_columns = self
3263 .ctx
3264 .field_columns
3265 .iter()
3266 .map(name_to_expr)
3267 .collect::<Result<Vec<_>>>()?;
3268
3269 self.ctx.field_columns = result_field_columns
3271 .iter()
3272 .map(|expr| expr.schema_name().to_string())
3273 .collect();
3274 let field_columns_iter = result_field_columns
3275 .into_iter()
3276 .zip(self.ctx.field_columns.iter())
3277 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3278
3279 let project_fields = non_field_columns_iter
3281 .chain(field_columns_iter)
3282 .collect::<Result<Vec<_>>>()?;
3283
3284 LogicalPlanBuilder::from(input)
3285 .project(project_fields)
3286 .context(DataFusionPlanningSnafu)?
3287 .build()
3288 .context(DataFusionPlanningSnafu)
3289 }
3290
3291 fn filter_on_field_column<F>(
3294 &self,
3295 input: LogicalPlan,
3296 mut name_to_expr: F,
3297 ) -> Result<LogicalPlan>
3298 where
3299 F: FnMut(&String) -> Result<DfExpr>,
3300 {
3301 ensure!(
3302 self.ctx.field_columns.len() == 1,
3303 UnsupportedExprSnafu {
3304 name: "filter on multi-value input"
3305 }
3306 );
3307
3308 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3309
3310 LogicalPlanBuilder::from(input)
3311 .filter(field_column_filter)
3312 .context(DataFusionPlanningSnafu)?
3313 .build()
3314 .context(DataFusionPlanningSnafu)
3315 }
3316
3317 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3320 let input_expr = datafusion::logical_expr::col(
3321 self.ctx
3322 .time_index_column
3323 .as_ref()
3324 .with_context(|| TimeIndexNotFoundSnafu {
3326 table: "<doesn't matter>",
3327 })?,
3328 );
3329 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3330 func: datafusion_functions::datetime::date_part(),
3331 args: vec![date_part.lit(), input_expr],
3332 });
3333 Ok(fn_expr)
3334 }
3335}
3336
3337#[derive(Default, Debug)]
3338struct FunctionArgs {
3339 input: Option<PromExpr>,
3340 literals: Vec<DfExpr>,
3341}
3342
3343#[derive(Debug, Clone)]
3346enum ScalarFunc {
3347 DataFusionBuiltin(Arc<ScalarUdfDef>),
3351 DataFusionUdf(Arc<ScalarUdfDef>),
3355 Udf(Arc<ScalarUdfDef>),
3360 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3367 GeneratedExpr,
3371}
3372
3373#[cfg(test)]
3374mod test {
3375 use std::time::{Duration, UNIX_EPOCH};
3376
3377 use catalog::RegisterTableRequest;
3378 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3379 use common_base::Plugins;
3380 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3381 use common_query::test_util::DummyDecoder;
3382 use datatypes::prelude::ConcreteDataType;
3383 use datatypes::schema::{ColumnSchema, Schema};
3384 use promql_parser::label::Labels;
3385 use promql_parser::parser;
3386 use session::context::QueryContext;
3387 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3388 use table::test_util::EmptyTable;
3389
3390 use super::*;
3391 use crate::options::QueryOptions;
3392
3393 fn build_query_engine_state() -> QueryEngineState {
3394 QueryEngineState::new(
3395 new_memory_catalog_manager().unwrap(),
3396 None,
3397 None,
3398 None,
3399 None,
3400 None,
3401 false,
3402 Plugins::default(),
3403 QueryOptions::default(),
3404 )
3405 }
3406
3407 async fn build_test_table_provider(
3408 table_name_tuples: &[(String, String)],
3409 num_tag: usize,
3410 num_field: usize,
3411 ) -> DfTableSourceProvider {
3412 let catalog_list = MemoryCatalogManager::with_default_setup();
3413 for (schema_name, table_name) in table_name_tuples {
3414 let mut columns = vec![];
3415 for i in 0..num_tag {
3416 columns.push(ColumnSchema::new(
3417 format!("tag_{i}"),
3418 ConcreteDataType::string_datatype(),
3419 false,
3420 ));
3421 }
3422 columns.push(
3423 ColumnSchema::new(
3424 "timestamp".to_string(),
3425 ConcreteDataType::timestamp_millisecond_datatype(),
3426 false,
3427 )
3428 .with_time_index(true),
3429 );
3430 for i in 0..num_field {
3431 columns.push(ColumnSchema::new(
3432 format!("field_{i}"),
3433 ConcreteDataType::float64_datatype(),
3434 true,
3435 ));
3436 }
3437 let schema = Arc::new(Schema::new(columns));
3438 let table_meta = TableMetaBuilder::empty()
3439 .schema(schema)
3440 .primary_key_indices((0..num_tag).collect())
3441 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3442 .next_column_id(1024)
3443 .build()
3444 .unwrap();
3445 let table_info = TableInfoBuilder::default()
3446 .name(table_name.to_string())
3447 .meta(table_meta)
3448 .build()
3449 .unwrap();
3450 let table = EmptyTable::from_table_info(&table_info);
3451
3452 assert!(
3453 catalog_list
3454 .register_table_sync(RegisterTableRequest {
3455 catalog: DEFAULT_CATALOG_NAME.to_string(),
3456 schema: schema_name.to_string(),
3457 table_name: table_name.to_string(),
3458 table_id: 1024,
3459 table,
3460 })
3461 .is_ok()
3462 );
3463 }
3464
3465 DfTableSourceProvider::new(
3466 catalog_list,
3467 false,
3468 QueryContext::arc(),
3469 DummyDecoder::arc(),
3470 false,
3471 )
3472 }
3473
3474 async fn build_test_table_provider_with_fields(
3475 table_name_tuples: &[(String, String)],
3476 tags: &[&str],
3477 ) -> DfTableSourceProvider {
3478 let catalog_list = MemoryCatalogManager::with_default_setup();
3479 for (schema_name, table_name) in table_name_tuples {
3480 let mut columns = vec![];
3481 let num_tag = tags.len();
3482 for tag in tags {
3483 columns.push(ColumnSchema::new(
3484 tag.to_string(),
3485 ConcreteDataType::string_datatype(),
3486 false,
3487 ));
3488 }
3489 columns.push(
3490 ColumnSchema::new(
3491 "greptime_timestamp".to_string(),
3492 ConcreteDataType::timestamp_millisecond_datatype(),
3493 false,
3494 )
3495 .with_time_index(true),
3496 );
3497 columns.push(ColumnSchema::new(
3498 "greptime_value".to_string(),
3499 ConcreteDataType::float64_datatype(),
3500 true,
3501 ));
3502 let schema = Arc::new(Schema::new(columns));
3503 let table_meta = TableMetaBuilder::empty()
3504 .schema(schema)
3505 .primary_key_indices((0..num_tag).collect())
3506 .next_column_id(1024)
3507 .build()
3508 .unwrap();
3509 let table_info = TableInfoBuilder::default()
3510 .name(table_name.to_string())
3511 .meta(table_meta)
3512 .build()
3513 .unwrap();
3514 let table = EmptyTable::from_table_info(&table_info);
3515
3516 assert!(
3517 catalog_list
3518 .register_table_sync(RegisterTableRequest {
3519 catalog: DEFAULT_CATALOG_NAME.to_string(),
3520 schema: schema_name.to_string(),
3521 table_name: table_name.to_string(),
3522 table_id: 1024,
3523 table,
3524 })
3525 .is_ok()
3526 );
3527 }
3528
3529 DfTableSourceProvider::new(
3530 catalog_list,
3531 false,
3532 QueryContext::arc(),
3533 DummyDecoder::arc(),
3534 false,
3535 )
3536 }
3537
3538 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3554 let prom_expr =
3555 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3556 let eval_stmt = EvalStmt {
3557 expr: prom_expr,
3558 start: UNIX_EPOCH,
3559 end: UNIX_EPOCH
3560 .checked_add(Duration::from_secs(100_000))
3561 .unwrap(),
3562 interval: Duration::from_secs(5),
3563 lookback_delta: Duration::from_secs(1),
3564 };
3565
3566 let table_provider = build_test_table_provider(
3567 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3568 1,
3569 1,
3570 )
3571 .await;
3572 let plan =
3573 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3574 .await
3575 .unwrap();
3576
3577 let expected = String::from(
3578 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3579 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3580 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3581 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3582 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3583 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3584 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3585 ).replace("TEMPLATE", plan_name);
3586
3587 assert_eq!(plan.display_indent_schema().to_string(), expected);
3588 }
3589
3590 #[tokio::test]
3591 async fn single_abs() {
3592 do_single_instant_function_call("abs", "abs").await;
3593 }
3594
3595 #[tokio::test]
3596 #[should_panic]
3597 async fn single_absent() {
3598 do_single_instant_function_call("absent", "").await;
3599 }
3600
3601 #[tokio::test]
3602 async fn single_ceil() {
3603 do_single_instant_function_call("ceil", "ceil").await;
3604 }
3605
3606 #[tokio::test]
3607 async fn single_exp() {
3608 do_single_instant_function_call("exp", "exp").await;
3609 }
3610
3611 #[tokio::test]
3612 async fn single_ln() {
3613 do_single_instant_function_call("ln", "ln").await;
3614 }
3615
3616 #[tokio::test]
3617 async fn single_log2() {
3618 do_single_instant_function_call("log2", "log2").await;
3619 }
3620
3621 #[tokio::test]
3622 async fn single_log10() {
3623 do_single_instant_function_call("log10", "log10").await;
3624 }
3625
3626 #[tokio::test]
3627 #[should_panic]
3628 async fn single_scalar() {
3629 do_single_instant_function_call("scalar", "").await;
3630 }
3631
3632 #[tokio::test]
3633 #[should_panic]
3634 async fn single_sgn() {
3635 do_single_instant_function_call("sgn", "").await;
3636 }
3637
3638 #[tokio::test]
3639 #[should_panic]
3640 async fn single_sort() {
3641 do_single_instant_function_call("sort", "").await;
3642 }
3643
3644 #[tokio::test]
3645 #[should_panic]
3646 async fn single_sort_desc() {
3647 do_single_instant_function_call("sort_desc", "").await;
3648 }
3649
3650 #[tokio::test]
3651 async fn single_sqrt() {
3652 do_single_instant_function_call("sqrt", "sqrt").await;
3653 }
3654
3655 #[tokio::test]
3656 #[should_panic]
3657 async fn single_timestamp() {
3658 do_single_instant_function_call("timestamp", "").await;
3659 }
3660
3661 #[tokio::test]
3662 async fn single_acos() {
3663 do_single_instant_function_call("acos", "acos").await;
3664 }
3665
3666 #[tokio::test]
3667 #[should_panic]
3668 async fn single_acosh() {
3669 do_single_instant_function_call("acosh", "").await;
3670 }
3671
3672 #[tokio::test]
3673 async fn single_asin() {
3674 do_single_instant_function_call("asin", "asin").await;
3675 }
3676
3677 #[tokio::test]
3678 #[should_panic]
3679 async fn single_asinh() {
3680 do_single_instant_function_call("asinh", "").await;
3681 }
3682
3683 #[tokio::test]
3684 async fn single_atan() {
3685 do_single_instant_function_call("atan", "atan").await;
3686 }
3687
3688 #[tokio::test]
3689 #[should_panic]
3690 async fn single_atanh() {
3691 do_single_instant_function_call("atanh", "").await;
3692 }
3693
3694 #[tokio::test]
3695 async fn single_cos() {
3696 do_single_instant_function_call("cos", "cos").await;
3697 }
3698
3699 #[tokio::test]
3700 #[should_panic]
3701 async fn single_cosh() {
3702 do_single_instant_function_call("cosh", "").await;
3703 }
3704
3705 #[tokio::test]
3706 async fn single_sin() {
3707 do_single_instant_function_call("sin", "sin").await;
3708 }
3709
3710 #[tokio::test]
3711 #[should_panic]
3712 async fn single_sinh() {
3713 do_single_instant_function_call("sinh", "").await;
3714 }
3715
3716 #[tokio::test]
3717 async fn single_tan() {
3718 do_single_instant_function_call("tan", "tan").await;
3719 }
3720
3721 #[tokio::test]
3722 #[should_panic]
3723 async fn single_tanh() {
3724 do_single_instant_function_call("tanh", "").await;
3725 }
3726
3727 #[tokio::test]
3728 #[should_panic]
3729 async fn single_deg() {
3730 do_single_instant_function_call("deg", "").await;
3731 }
3732
3733 #[tokio::test]
3734 #[should_panic]
3735 async fn single_rad() {
3736 do_single_instant_function_call("rad", "").await;
3737 }
3738
3739 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3761 let prom_expr = parser::parse(&format!(
3762 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3763 ))
3764 .unwrap();
3765 let mut eval_stmt = EvalStmt {
3766 expr: prom_expr,
3767 start: UNIX_EPOCH,
3768 end: UNIX_EPOCH
3769 .checked_add(Duration::from_secs(100_000))
3770 .unwrap(),
3771 interval: Duration::from_secs(5),
3772 lookback_delta: Duration::from_secs(1),
3773 };
3774
3775 let table_provider = build_test_table_provider(
3777 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3778 2,
3779 2,
3780 )
3781 .await;
3782 let plan =
3783 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3784 .await
3785 .unwrap();
3786 let expected_no_without = String::from(
3787 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3788 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3789 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3790 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3791 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3792 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3793 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3794 ).replace("TEMPLATE", plan_name);
3795 assert_eq!(
3796 plan.display_indent_schema().to_string(),
3797 expected_no_without
3798 );
3799
3800 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3802 *modifier = Some(LabelModifier::Exclude(Labels {
3803 labels: vec![String::from("tag_1")].into_iter().collect(),
3804 }));
3805 }
3806 let table_provider = build_test_table_provider(
3807 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3808 2,
3809 2,
3810 )
3811 .await;
3812 let plan =
3813 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3814 .await
3815 .unwrap();
3816 let expected_without = String::from(
3817 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3818 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3819 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3820 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3821 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3822 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3823 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3824 ).replace("TEMPLATE", plan_name);
3825 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3826 }
3827
3828 #[tokio::test]
3829 async fn aggregate_sum() {
3830 do_aggregate_expr_plan("sum", "sum").await;
3831 }
3832
3833 #[tokio::test]
3834 async fn aggregate_avg() {
3835 do_aggregate_expr_plan("avg", "avg").await;
3836 }
3837
3838 #[tokio::test]
3839 #[should_panic] async fn aggregate_count() {
3841 do_aggregate_expr_plan("count", "count").await;
3842 }
3843
3844 #[tokio::test]
3845 async fn aggregate_min() {
3846 do_aggregate_expr_plan("min", "min").await;
3847 }
3848
3849 #[tokio::test]
3850 async fn aggregate_max() {
3851 do_aggregate_expr_plan("max", "max").await;
3852 }
3853
3854 #[tokio::test]
3855 #[should_panic] async fn aggregate_group() {
3857 do_aggregate_expr_plan("grouping", "GROUPING").await;
3858 }
3859
3860 #[tokio::test]
3861 async fn aggregate_stddev() {
3862 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3863 }
3864
3865 #[tokio::test]
3866 async fn aggregate_stdvar() {
3867 do_aggregate_expr_plan("stdvar", "var_pop").await;
3868 }
3869
3870 #[tokio::test]
3894 async fn binary_op_column_column() {
3895 let prom_expr =
3896 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3897 let eval_stmt = EvalStmt {
3898 expr: prom_expr,
3899 start: UNIX_EPOCH,
3900 end: UNIX_EPOCH
3901 .checked_add(Duration::from_secs(100_000))
3902 .unwrap(),
3903 interval: Duration::from_secs(5),
3904 lookback_delta: Duration::from_secs(1),
3905 };
3906
3907 let table_provider = build_test_table_provider(
3908 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3909 1,
3910 1,
3911 )
3912 .await;
3913 let plan =
3914 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3915 .await
3916 .unwrap();
3917
3918 let expected = String::from(
3919 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3920 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3921 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3922 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3923 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3924 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3925 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3926 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3927 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3928 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3929 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3930 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3931 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3932 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3933 );
3934
3935 assert_eq!(plan.display_indent_schema().to_string(), expected);
3936 }
3937
3938 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3939 let prom_expr = parser::parse(query).unwrap();
3940 let eval_stmt = EvalStmt {
3941 expr: prom_expr,
3942 start: UNIX_EPOCH,
3943 end: UNIX_EPOCH
3944 .checked_add(Duration::from_secs(100_000))
3945 .unwrap(),
3946 interval: Duration::from_secs(5),
3947 lookback_delta: Duration::from_secs(1),
3948 };
3949
3950 let table_provider = build_test_table_provider(
3951 &[
3952 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3953 (
3954 "greptime_private".to_string(),
3955 "some_alt_metric".to_string(),
3956 ),
3957 ],
3958 1,
3959 1,
3960 )
3961 .await;
3962 let plan =
3963 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3964 .await
3965 .unwrap();
3966
3967 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
3968 }
3969
3970 #[tokio::test]
3971 async fn binary_op_literal_column() {
3972 let query = r#"1 + some_metric{tag_0="bar"}"#;
3973 let expected = String::from(
3974 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3975 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3976 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3979 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3980 );
3981
3982 indie_query_plan_compare(query, expected).await;
3983 }
3984
3985 #[tokio::test]
3986 async fn binary_op_literal_literal() {
3987 let query = r#"1 + 1"#;
3988 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
3989 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
3990 indie_query_plan_compare(query, expected).await;
3991 }
3992
3993 #[tokio::test]
3994 async fn simple_bool_grammar() {
3995 let query = "some_metric != bool 1.2345";
3996 let expected = String::from(
3997 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3998 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3999 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4000 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4001 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4002 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4003 );
4004
4005 indie_query_plan_compare(query, expected).await;
4006 }
4007
4008 #[tokio::test]
4009 async fn bool_with_additional_arithmetic() {
4010 let query = "some_metric + (1 == bool 2)";
4011 let expected = String::from(
4012 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4013 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4014 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4015 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4016 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4017 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4018 );
4019
4020 indie_query_plan_compare(query, expected).await;
4021 }
4022
4023 #[tokio::test]
4024 async fn simple_unary() {
4025 let query = "-some_metric";
4026 let expected = String::from(
4027 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4028 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4029 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4030 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4031 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4033 );
4034
4035 indie_query_plan_compare(query, expected).await;
4036 }
4037
4038 #[tokio::test]
4039 async fn increase_aggr() {
4040 let query = "increase(some_metric[5m])";
4041 let expected = String::from(
4042 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4043 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4044 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4045 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4046 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4047 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4048 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4049 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4050 );
4051
4052 indie_query_plan_compare(query, expected).await;
4053 }
4054
4055 #[tokio::test]
4056 async fn less_filter_on_value() {
4057 let query = "some_metric < 1.2345";
4058 let expected = String::from(
4059 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4063 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4064 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4065 );
4066
4067 indie_query_plan_compare(query, expected).await;
4068 }
4069
4070 #[tokio::test]
4071 async fn count_over_time() {
4072 let query = "count_over_time(some_metric[5m])";
4073 let expected = String::from(
4074 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4075 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4076 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4077 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4078 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4079 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4080 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4081 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4082 );
4083
4084 indie_query_plan_compare(query, expected).await;
4085 }
4086
4087 #[tokio::test]
4088 async fn test_hash_join() {
4089 let mut eval_stmt = EvalStmt {
4090 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4091 start: UNIX_EPOCH,
4092 end: UNIX_EPOCH
4093 .checked_add(Duration::from_secs(100_000))
4094 .unwrap(),
4095 interval: Duration::from_secs(5),
4096 lookback_delta: Duration::from_secs(1),
4097 };
4098
4099 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4100
4101 let prom_expr = parser::parse(case).unwrap();
4102 eval_stmt.expr = prom_expr;
4103 let table_provider = build_test_table_provider_with_fields(
4104 &[
4105 (
4106 DEFAULT_SCHEMA_NAME.to_string(),
4107 "http_server_requests_seconds_sum".to_string(),
4108 ),
4109 (
4110 DEFAULT_SCHEMA_NAME.to_string(),
4111 "http_server_requests_seconds_count".to_string(),
4112 ),
4113 ],
4114 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4115 )
4116 .await;
4117 let plan =
4119 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4120 .await
4121 .unwrap();
4122 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4123 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4124 \n SubqueryAlias: http_server_requests_seconds_sum\
4125 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4126 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4127 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4128 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4129 \n TableScan: http_server_requests_seconds_sum\
4130 \n SubqueryAlias: http_server_requests_seconds_count\
4131 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4132 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4133 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4134 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4135 \n TableScan: http_server_requests_seconds_count";
4136 assert_eq!(plan.to_string(), expected);
4137 }
4138
4139 #[tokio::test]
4140 async fn test_nested_histogram_quantile() {
4141 let mut eval_stmt = EvalStmt {
4142 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4143 start: UNIX_EPOCH,
4144 end: UNIX_EPOCH
4145 .checked_add(Duration::from_secs(100_000))
4146 .unwrap(),
4147 interval: Duration::from_secs(5),
4148 lookback_delta: Duration::from_secs(1),
4149 };
4150
4151 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4152
4153 let prom_expr = parser::parse(case).unwrap();
4154 eval_stmt.expr = prom_expr;
4155 let table_provider = build_test_table_provider_with_fields(
4156 &[(
4157 DEFAULT_SCHEMA_NAME.to_string(),
4158 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4159 )],
4160 &["pod", "le", "path", "code", "container"],
4161 )
4162 .await;
4163 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4165 .await
4166 .unwrap();
4167 }
4168
4169 #[tokio::test]
4170 async fn test_parse_and_operator() {
4171 let mut eval_stmt = EvalStmt {
4172 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4173 start: UNIX_EPOCH,
4174 end: UNIX_EPOCH
4175 .checked_add(Duration::from_secs(100_000))
4176 .unwrap(),
4177 interval: Duration::from_secs(5),
4178 lookback_delta: Duration::from_secs(1),
4179 };
4180
4181 let cases = [
4182 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4183 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4184 ];
4185
4186 for case in cases {
4187 let prom_expr = parser::parse(case).unwrap();
4188 eval_stmt.expr = prom_expr;
4189 let table_provider = build_test_table_provider_with_fields(
4190 &[
4191 (
4192 DEFAULT_SCHEMA_NAME.to_string(),
4193 "kubelet_volume_stats_used_bytes".to_string(),
4194 ),
4195 (
4196 DEFAULT_SCHEMA_NAME.to_string(),
4197 "kubelet_volume_stats_capacity_bytes".to_string(),
4198 ),
4199 ],
4200 &["namespace", "persistentvolumeclaim"],
4201 )
4202 .await;
4203 let _ =
4205 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4206 .await
4207 .unwrap();
4208 }
4209 }
4210
4211 #[tokio::test]
4212 async fn test_nested_binary_op() {
4213 let mut eval_stmt = EvalStmt {
4214 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4215 start: UNIX_EPOCH,
4216 end: UNIX_EPOCH
4217 .checked_add(Duration::from_secs(100_000))
4218 .unwrap(),
4219 interval: Duration::from_secs(5),
4220 lookback_delta: Duration::from_secs(1),
4221 };
4222
4223 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4224 (
4225 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4226 or
4227 vector(0)
4228 )"#;
4229
4230 let prom_expr = parser::parse(case).unwrap();
4231 eval_stmt.expr = prom_expr;
4232 let table_provider = build_test_table_provider_with_fields(
4233 &[(
4234 DEFAULT_SCHEMA_NAME.to_string(),
4235 "nginx_ingress_controller_requests".to_string(),
4236 )],
4237 &["namespace", "job"],
4238 )
4239 .await;
4240 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4242 .await
4243 .unwrap();
4244 }
4245
4246 #[tokio::test]
4247 async fn test_parse_or_operator() {
4248 let mut eval_stmt = EvalStmt {
4249 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4250 start: UNIX_EPOCH,
4251 end: UNIX_EPOCH
4252 .checked_add(Duration::from_secs(100_000))
4253 .unwrap(),
4254 interval: Duration::from_secs(5),
4255 lookback_delta: Duration::from_secs(1),
4256 };
4257
4258 let case = r#"
4259 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4260 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4261 or
4262 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4263 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4264
4265 let table_provider = build_test_table_provider_with_fields(
4266 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4267 &["tenant_name", "cluster_name"],
4268 )
4269 .await;
4270 eval_stmt.expr = parser::parse(case).unwrap();
4271 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4272 .await
4273 .unwrap();
4274
4275 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4276 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4277 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4278 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4279 or
4280 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4281 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4282 or
4283 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4284 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4285 let table_provider = build_test_table_provider_with_fields(
4286 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4287 &["tenant_name", "cluster_name"],
4288 )
4289 .await;
4290 eval_stmt.expr = parser::parse(case).unwrap();
4291 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4292 .await
4293 .unwrap();
4294
4295 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4296 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4297 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4298 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4299 let table_provider = build_test_table_provider_with_fields(
4300 &[
4301 (
4302 DEFAULT_SCHEMA_NAME.to_string(),
4303 "background_waitevent_cnt".to_string(),
4304 ),
4305 (
4306 DEFAULT_SCHEMA_NAME.to_string(),
4307 "foreground_waitevent_cnt".to_string(),
4308 ),
4309 ],
4310 &["tenant_name", "cluster_name"],
4311 )
4312 .await;
4313 eval_stmt.expr = parser::parse(case).unwrap();
4314 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4315 .await
4316 .unwrap();
4317
4318 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4319 let table_provider = build_test_table_provider_with_fields(
4320 &[
4321 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4322 (
4323 DEFAULT_SCHEMA_NAME.to_string(),
4324 "container_cpu_load_average_10s".to_string(),
4325 ),
4326 (
4327 DEFAULT_SCHEMA_NAME.to_string(),
4328 "container_spec_cpu_quota".to_string(),
4329 ),
4330 ],
4331 &["cluster_name", "host_name"],
4332 )
4333 .await;
4334 eval_stmt.expr = parser::parse(case).unwrap();
4335 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4336 .await
4337 .unwrap();
4338 }
4339
4340 #[tokio::test]
4341 async fn value_matcher() {
4342 let mut eval_stmt = EvalStmt {
4344 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4345 start: UNIX_EPOCH,
4346 end: UNIX_EPOCH
4347 .checked_add(Duration::from_secs(100_000))
4348 .unwrap(),
4349 interval: Duration::from_secs(5),
4350 lookback_delta: Duration::from_secs(1),
4351 };
4352
4353 let cases = [
4354 (
4356 r#"some_metric{__field__="field_1"}"#,
4357 vec![
4358 "some_metric.field_1",
4359 "some_metric.tag_0",
4360 "some_metric.tag_1",
4361 "some_metric.tag_2",
4362 "some_metric.timestamp",
4363 ],
4364 ),
4365 (
4367 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4368 vec![
4369 "some_metric.field_0",
4370 "some_metric.field_1",
4371 "some_metric.tag_0",
4372 "some_metric.tag_1",
4373 "some_metric.tag_2",
4374 "some_metric.timestamp",
4375 ],
4376 ),
4377 (
4379 r#"some_metric{__field__!="field_1"}"#,
4380 vec![
4381 "some_metric.field_0",
4382 "some_metric.field_2",
4383 "some_metric.tag_0",
4384 "some_metric.tag_1",
4385 "some_metric.tag_2",
4386 "some_metric.timestamp",
4387 ],
4388 ),
4389 (
4391 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4392 vec![
4393 "some_metric.field_0",
4394 "some_metric.tag_0",
4395 "some_metric.tag_1",
4396 "some_metric.tag_2",
4397 "some_metric.timestamp",
4398 ],
4399 ),
4400 (
4402 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4403 vec![
4404 "some_metric.field_1",
4405 "some_metric.tag_0",
4406 "some_metric.tag_1",
4407 "some_metric.tag_2",
4408 "some_metric.timestamp",
4409 ],
4410 ),
4411 (
4413 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4414 vec![
4415 "some_metric.tag_0",
4416 "some_metric.tag_1",
4417 "some_metric.tag_2",
4418 "some_metric.timestamp",
4419 ],
4420 ),
4421 (
4423 r#"some_metric{__field__=~"field_1|field_2"}"#,
4424 vec![
4425 "some_metric.field_1",
4426 "some_metric.field_2",
4427 "some_metric.tag_0",
4428 "some_metric.tag_1",
4429 "some_metric.tag_2",
4430 "some_metric.timestamp",
4431 ],
4432 ),
4433 (
4435 r#"some_metric{__field__!~"field_1|field_2"}"#,
4436 vec![
4437 "some_metric.field_0",
4438 "some_metric.tag_0",
4439 "some_metric.tag_1",
4440 "some_metric.tag_2",
4441 "some_metric.timestamp",
4442 ],
4443 ),
4444 ];
4445
4446 for case in cases {
4447 let prom_expr = parser::parse(case.0).unwrap();
4448 eval_stmt.expr = prom_expr;
4449 let table_provider = build_test_table_provider(
4450 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4451 3,
4452 3,
4453 )
4454 .await;
4455 let plan =
4456 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4457 .await
4458 .unwrap();
4459 let mut fields = plan.schema().field_names();
4460 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4461 fields.sort();
4462 expected.sort();
4463 assert_eq!(fields, expected, "case: {:?}", case.0);
4464 }
4465
4466 let bad_cases = [
4467 r#"some_metric{__field__="nonexistent"}"#,
4468 r#"some_metric{__field__!="nonexistent"}"#,
4469 ];
4470
4471 for case in bad_cases {
4472 let prom_expr = parser::parse(case).unwrap();
4473 eval_stmt.expr = prom_expr;
4474 let table_provider = build_test_table_provider(
4475 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4476 3,
4477 3,
4478 )
4479 .await;
4480 let plan =
4481 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4482 .await;
4483 assert!(plan.is_err(), "case: {:?}", case);
4484 }
4485 }
4486
4487 #[tokio::test]
4488 async fn custom_schema() {
4489 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4490 let expected = String::from(
4491 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4492 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4493 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4494 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4495 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4496 );
4497
4498 indie_query_plan_compare(query, expected).await;
4499
4500 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4501 let expected = String::from(
4502 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4503 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4504 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4505 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4506 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4507 );
4508
4509 indie_query_plan_compare(query, expected).await;
4510
4511 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4512 let expected = String::from(
4513 "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4514 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4515 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4516 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4517 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4518 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4519 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4520 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4521 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4522 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4523 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4524 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4525 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4526 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4527 );
4528
4529 indie_query_plan_compare(query, expected).await;
4530 }
4531
4532 #[tokio::test]
4533 async fn only_equals_is_supported_for_special_matcher() {
4534 let queries = &[
4535 "some_alt_metric{__schema__!=\"greptime_private\"}",
4536 "some_alt_metric{__schema__=~\"lalala\"}",
4537 "some_alt_metric{__database__!=\"greptime_private\"}",
4538 "some_alt_metric{__database__=~\"lalala\"}",
4539 ];
4540
4541 for query in queries {
4542 let prom_expr = parser::parse(query).unwrap();
4543 let eval_stmt = EvalStmt {
4544 expr: prom_expr,
4545 start: UNIX_EPOCH,
4546 end: UNIX_EPOCH
4547 .checked_add(Duration::from_secs(100_000))
4548 .unwrap(),
4549 interval: Duration::from_secs(5),
4550 lookback_delta: Duration::from_secs(1),
4551 };
4552
4553 let table_provider = build_test_table_provider(
4554 &[
4555 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4556 (
4557 "greptime_private".to_string(),
4558 "some_alt_metric".to_string(),
4559 ),
4560 ],
4561 1,
4562 1,
4563 )
4564 .await;
4565
4566 let plan =
4567 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4568 .await;
4569 assert!(plan.is_err(), "query: {:?}", query);
4570 }
4571 }
4572
4573 #[tokio::test]
4574 async fn test_non_ms_precision() {
4575 let catalog_list = MemoryCatalogManager::with_default_setup();
4576 let columns = vec![
4577 ColumnSchema::new(
4578 "tag".to_string(),
4579 ConcreteDataType::string_datatype(),
4580 false,
4581 ),
4582 ColumnSchema::new(
4583 "timestamp".to_string(),
4584 ConcreteDataType::timestamp_nanosecond_datatype(),
4585 false,
4586 )
4587 .with_time_index(true),
4588 ColumnSchema::new(
4589 "field".to_string(),
4590 ConcreteDataType::float64_datatype(),
4591 true,
4592 ),
4593 ];
4594 let schema = Arc::new(Schema::new(columns));
4595 let table_meta = TableMetaBuilder::empty()
4596 .schema(schema)
4597 .primary_key_indices(vec![0])
4598 .value_indices(vec![2])
4599 .next_column_id(1024)
4600 .build()
4601 .unwrap();
4602 let table_info = TableInfoBuilder::default()
4603 .name("metrics".to_string())
4604 .meta(table_meta)
4605 .build()
4606 .unwrap();
4607 let table = EmptyTable::from_table_info(&table_info);
4608 assert!(
4609 catalog_list
4610 .register_table_sync(RegisterTableRequest {
4611 catalog: DEFAULT_CATALOG_NAME.to_string(),
4612 schema: DEFAULT_SCHEMA_NAME.to_string(),
4613 table_name: "metrics".to_string(),
4614 table_id: 1024,
4615 table,
4616 })
4617 .is_ok()
4618 );
4619
4620 let plan = PromPlanner::stmt_to_plan(
4621 DfTableSourceProvider::new(
4622 catalog_list.clone(),
4623 false,
4624 QueryContext::arc(),
4625 DummyDecoder::arc(),
4626 true,
4627 ),
4628 &EvalStmt {
4629 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4630 start: UNIX_EPOCH,
4631 end: UNIX_EPOCH
4632 .checked_add(Duration::from_secs(100_000))
4633 .unwrap(),
4634 interval: Duration::from_secs(5),
4635 lookback_delta: Duration::from_secs(1),
4636 },
4637 &build_query_engine_state(),
4638 )
4639 .await
4640 .unwrap();
4641 assert_eq!(
4642 plan.display_indent_schema().to_string(),
4643 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4644 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4645 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4646 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4647 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4648 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4649 );
4650 let plan = PromPlanner::stmt_to_plan(
4651 DfTableSourceProvider::new(
4652 catalog_list.clone(),
4653 false,
4654 QueryContext::arc(),
4655 DummyDecoder::arc(),
4656 true,
4657 ),
4658 &EvalStmt {
4659 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4660 start: UNIX_EPOCH,
4661 end: UNIX_EPOCH
4662 .checked_add(Duration::from_secs(100_000))
4663 .unwrap(),
4664 interval: Duration::from_secs(5),
4665 lookback_delta: Duration::from_secs(1),
4666 },
4667 &build_query_engine_state(),
4668 )
4669 .await
4670 .unwrap();
4671 assert_eq!(
4672 plan.display_indent_schema().to_string(),
4673 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4674 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4675 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4676 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4677 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4678 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4679 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4680 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4681 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4682 );
4683 }
4684
4685 #[tokio::test]
4686 async fn test_nonexistent_label() {
4687 let mut eval_stmt = EvalStmt {
4689 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4690 start: UNIX_EPOCH,
4691 end: UNIX_EPOCH
4692 .checked_add(Duration::from_secs(100_000))
4693 .unwrap(),
4694 interval: Duration::from_secs(5),
4695 lookback_delta: Duration::from_secs(1),
4696 };
4697
4698 let case = r#"some_metric{nonexistent="hi"}"#;
4699 let prom_expr = parser::parse(case).unwrap();
4700 eval_stmt.expr = prom_expr;
4701 let table_provider = build_test_table_provider(
4702 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4703 3,
4704 3,
4705 )
4706 .await;
4707 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4709 .await
4710 .unwrap();
4711 }
4712
4713 #[tokio::test]
4714 async fn test_label_join() {
4715 let prom_expr = parser::parse(
4716 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4717 )
4718 .unwrap();
4719 let eval_stmt = EvalStmt {
4720 expr: prom_expr,
4721 start: UNIX_EPOCH,
4722 end: UNIX_EPOCH
4723 .checked_add(Duration::from_secs(100_000))
4724 .unwrap(),
4725 interval: Duration::from_secs(5),
4726 lookback_delta: Duration::from_secs(1),
4727 };
4728
4729 let table_provider =
4730 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4731 .await;
4732 let plan =
4733 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4734 .await
4735 .unwrap();
4736
4737 let expected = r#"
4738Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4739 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4740 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4741 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4742 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4743 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4744 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4745
4746 let ret = plan.display_indent_schema().to_string();
4747 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4748 }
4749
4750 #[tokio::test]
4751 async fn test_label_replace() {
4752 let prom_expr = parser::parse(
4753 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4754 )
4755 .unwrap();
4756 let eval_stmt = EvalStmt {
4757 expr: prom_expr,
4758 start: UNIX_EPOCH,
4759 end: UNIX_EPOCH
4760 .checked_add(Duration::from_secs(100_000))
4761 .unwrap(),
4762 interval: Duration::from_secs(5),
4763 lookback_delta: Duration::from_secs(1),
4764 };
4765
4766 let table_provider =
4767 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4768 .await;
4769 let plan =
4770 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4771 .await
4772 .unwrap();
4773
4774 let expected = r#"
4775Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4776 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4777 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4778 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4779 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4780 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4781 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4782
4783 let ret = plan.display_indent_schema().to_string();
4784 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4785 }
4786
4787 #[tokio::test]
4788 async fn test_matchers_to_expr() {
4789 let mut eval_stmt = EvalStmt {
4790 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4791 start: UNIX_EPOCH,
4792 end: UNIX_EPOCH
4793 .checked_add(Duration::from_secs(100_000))
4794 .unwrap(),
4795 interval: Duration::from_secs(5),
4796 lookback_delta: Duration::from_secs(1),
4797 };
4798 let case =
4799 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4800
4801 let prom_expr = parser::parse(case).unwrap();
4802 eval_stmt.expr = prom_expr;
4803 let table_provider = build_test_table_provider(
4804 &[(
4805 DEFAULT_SCHEMA_NAME.to_string(),
4806 "prometheus_tsdb_head_series".to_string(),
4807 )],
4808 3,
4809 3,
4810 )
4811 .await;
4812 let plan =
4813 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4814 .await
4815 .unwrap();
4816 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4817 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4818 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4819 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4820 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4821 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4822 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4823 assert_eq!(plan.display_indent_schema().to_string(), expected);
4824 }
4825
4826 #[tokio::test]
4827 async fn test_topk_expr() {
4828 let mut eval_stmt = EvalStmt {
4829 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4830 start: UNIX_EPOCH,
4831 end: UNIX_EPOCH
4832 .checked_add(Duration::from_secs(100_000))
4833 .unwrap(),
4834 interval: Duration::from_secs(5),
4835 lookback_delta: Duration::from_secs(1),
4836 };
4837 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4838
4839 let prom_expr = parser::parse(case).unwrap();
4840 eval_stmt.expr = prom_expr;
4841 let table_provider = build_test_table_provider_with_fields(
4842 &[
4843 (
4844 DEFAULT_SCHEMA_NAME.to_string(),
4845 "prometheus_tsdb_head_series".to_string(),
4846 ),
4847 (
4848 DEFAULT_SCHEMA_NAME.to_string(),
4849 "http_server_requests_seconds_count".to_string(),
4850 ),
4851 ],
4852 &["ip"],
4853 )
4854 .await;
4855
4856 let plan =
4857 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4858 .await
4859 .unwrap();
4860 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4861 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4862 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4863 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4864 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4865 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4866 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4867 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4868 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4869 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4870 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4871
4872 assert_eq!(plan.display_indent_schema().to_string(), expected);
4873 }
4874
4875 #[tokio::test]
4876 async fn test_count_values_expr() {
4877 let mut eval_stmt = EvalStmt {
4878 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4879 start: UNIX_EPOCH,
4880 end: UNIX_EPOCH
4881 .checked_add(Duration::from_secs(100_000))
4882 .unwrap(),
4883 interval: Duration::from_secs(5),
4884 lookback_delta: Duration::from_secs(1),
4885 };
4886 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4887
4888 let prom_expr = parser::parse(case).unwrap();
4889 eval_stmt.expr = prom_expr;
4890 let table_provider = build_test_table_provider_with_fields(
4891 &[
4892 (
4893 DEFAULT_SCHEMA_NAME.to_string(),
4894 "prometheus_tsdb_head_series".to_string(),
4895 ),
4896 (
4897 DEFAULT_SCHEMA_NAME.to_string(),
4898 "http_server_requests_seconds_count".to_string(),
4899 ),
4900 ],
4901 &["ip"],
4902 )
4903 .await;
4904
4905 let plan =
4906 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4907 .await
4908 .unwrap();
4909 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4910 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4911 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4912 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4913 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4914 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4915 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4916 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4917 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4918
4919 assert_eq!(plan.display_indent_schema().to_string(), expected);
4920 }
4921
4922 #[tokio::test]
4923 async fn test_quantile_expr() {
4924 let mut eval_stmt = EvalStmt {
4925 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4926 start: UNIX_EPOCH,
4927 end: UNIX_EPOCH
4928 .checked_add(Duration::from_secs(100_000))
4929 .unwrap(),
4930 interval: Duration::from_secs(5),
4931 lookback_delta: Duration::from_secs(1),
4932 };
4933 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4934
4935 let prom_expr = parser::parse(case).unwrap();
4936 eval_stmt.expr = prom_expr;
4937 let table_provider = build_test_table_provider_with_fields(
4938 &[
4939 (
4940 DEFAULT_SCHEMA_NAME.to_string(),
4941 "prometheus_tsdb_head_series".to_string(),
4942 ),
4943 (
4944 DEFAULT_SCHEMA_NAME.to_string(),
4945 "http_server_requests_seconds_count".to_string(),
4946 ),
4947 ],
4948 &["ip"],
4949 )
4950 .await;
4951
4952 let plan =
4953 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4954 .await
4955 .unwrap();
4956 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4957 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4958 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4959 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4960 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4961 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4962 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4963 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4964 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4965
4966 assert_eq!(plan.display_indent_schema().to_string(), expected);
4967 }
4968
4969 #[tokio::test]
4970 async fn test_or_not_exists_table_label() {
4971 let mut eval_stmt = EvalStmt {
4972 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4973 start: UNIX_EPOCH,
4974 end: UNIX_EPOCH
4975 .checked_add(Duration::from_secs(100_000))
4976 .unwrap(),
4977 interval: Duration::from_secs(5),
4978 lookback_delta: Duration::from_secs(1),
4979 };
4980 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4981
4982 let prom_expr = parser::parse(case).unwrap();
4983 eval_stmt.expr = prom_expr;
4984 let table_provider = build_test_table_provider_with_fields(
4985 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4986 &["job"],
4987 )
4988 .await;
4989
4990 let plan =
4991 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4992 .await
4993 .unwrap();
4994 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4995 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4996 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4997 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4998 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4999 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5000 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5001 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5002 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5003 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5004 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5005 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5006 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5007 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5008 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5009 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5010
5011 assert_eq!(plan.display_indent_schema().to_string(), expected);
5012 }
5013
5014 #[tokio::test]
5015 async fn test_histogram_quantile_missing_le_column() {
5016 let mut eval_stmt = EvalStmt {
5017 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5018 start: UNIX_EPOCH,
5019 end: UNIX_EPOCH
5020 .checked_add(Duration::from_secs(100_000))
5021 .unwrap(),
5022 interval: Duration::from_secs(5),
5023 lookback_delta: Duration::from_secs(1),
5024 };
5025
5026 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5028
5029 let prom_expr = parser::parse(case).unwrap();
5030 eval_stmt.expr = prom_expr;
5031
5032 let table_provider = build_test_table_provider_with_fields(
5034 &[(
5035 DEFAULT_SCHEMA_NAME.to_string(),
5036 "non_existent_histogram_bucket".to_string(),
5037 )],
5038 &["pod", "instance"], )
5040 .await;
5041
5042 let result =
5044 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5045 .await;
5046
5047 assert!(
5049 result.is_ok(),
5050 "Expected successful plan creation with empty result, but got error: {:?}",
5051 result.err()
5052 );
5053
5054 let plan = result.unwrap();
5056 match plan {
5057 LogicalPlan::EmptyRelation(_) => {
5058 }
5060 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5061 }
5062 }
5063}