1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61 Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63 quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71 VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88 ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92const SPECIAL_TIME_FUNCTION: &str = "time";
94const SCALAR_FUNCTION: &str = "scalar";
96const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102const LE_COLUMN_NAME: &str = "le";
104
105static LABEL_NAME_REGEX: Lazy<Regex> =
108 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112const DEFAULT_FIELD_COLUMN: &str = "value";
114
115const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122const MAX_SCATTER_POINTS: i64 = 400;
124
125const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130 start: Millisecond,
132 end: Millisecond,
133 interval: Millisecond,
134 lookback_delta: Millisecond,
135
136 table_name: Option<String>,
138 time_index_column: Option<String>,
139 field_columns: Vec<String>,
140 tag_columns: Vec<String>,
141 field_column_matcher: Option<Vec<Matcher>>,
143 selector_matcher: Vec<Matcher>,
145 schema_name: Option<String>,
146 range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152 Self {
153 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155 interval: stmt.interval.as_millis() as _,
156 lookback_delta: stmt.lookback_delta.as_millis() as _,
157 ..Default::default()
158 }
159 }
160
161 fn reset(&mut self) {
163 self.table_name = None;
164 self.time_index_column = None;
165 self.field_columns = vec![];
166 self.tag_columns = vec![];
167 self.field_column_matcher = None;
168 self.selector_matcher.clear();
169 self.schema_name = None;
170 self.range = None;
171 }
172
173 fn reset_table_name_and_schema(&mut self) {
175 self.table_name = Some(String::new());
176 self.schema_name = None;
177 }
178
179 fn has_le_tag(&self) -> bool {
181 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182 }
183}
184
185pub struct PromPlanner {
186 table_provider: DfTableSourceProvider,
187 ctx: PromPlannerContext,
188}
189
190pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
192 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
193 matcher.value = unescaped_value;
194 }
195 matcher
196}
197
198impl PromPlanner {
199 pub async fn stmt_to_plan_with_alias(
200 table_provider: DfTableSourceProvider,
201 stmt: &EvalStmt,
202 alias: Option<String>,
203 query_engine_state: &QueryEngineState,
204 ) -> Result<LogicalPlan> {
205 let mut planner = Self {
206 table_provider,
207 ctx: PromPlannerContext::from_eval_stmt(stmt),
208 };
209
210 let plan = planner
211 .prom_expr_to_plan(&stmt.expr, query_engine_state)
212 .await?;
213
214 if let Some(alias_name) = alias {
216 planner.apply_alias_projection(plan, alias_name)
217 } else {
218 Ok(plan)
219 }
220 }
221
222 #[cfg(test)]
223 async fn stmt_to_plan(
224 table_provider: DfTableSourceProvider,
225 stmt: &EvalStmt,
226 query_engine_state: &QueryEngineState,
227 ) -> Result<LogicalPlan> {
228 Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
229 }
230
231 pub async fn prom_expr_to_plan(
232 &mut self,
233 prom_expr: &PromExpr,
234 query_engine_state: &QueryEngineState,
235 ) -> Result<LogicalPlan> {
236 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
237 .await
238 }
239
240 #[async_recursion]
250 async fn prom_expr_to_plan_inner(
251 &mut self,
252 prom_expr: &PromExpr,
253 timestamp_fn: bool,
254 query_engine_state: &QueryEngineState,
255 ) -> Result<LogicalPlan> {
256 let res = match prom_expr {
257 PromExpr::Aggregate(expr) => {
258 self.prom_aggr_expr_to_plan(query_engine_state, expr)
259 .await?
260 }
261 PromExpr::Unary(expr) => {
262 self.prom_unary_expr_to_plan(query_engine_state, expr)
263 .await?
264 }
265 PromExpr::Binary(expr) => {
266 self.prom_binary_expr_to_plan(query_engine_state, expr)
267 .await?
268 }
269 PromExpr::Paren(ParenExpr { expr }) => {
270 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
271 .await?
272 }
273 PromExpr::Subquery(expr) => {
274 self.prom_subquery_expr_to_plan(query_engine_state, expr)
275 .await?
276 }
277 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
278 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
279 PromExpr::VectorSelector(selector) => {
280 self.prom_vector_selector_to_plan(selector, timestamp_fn)
281 .await?
282 }
283 PromExpr::MatrixSelector(selector) => {
284 self.prom_matrix_selector_to_plan(selector).await?
285 }
286 PromExpr::Call(expr) => {
287 self.prom_call_expr_to_plan(query_engine_state, expr)
288 .await?
289 }
290 PromExpr::Extension(expr) => {
291 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
292 }
293 };
294
295 Ok(res)
296 }
297
298 async fn prom_subquery_expr_to_plan(
299 &mut self,
300 query_engine_state: &QueryEngineState,
301 subquery_expr: &SubqueryExpr,
302 ) -> Result<LogicalPlan> {
303 let SubqueryExpr {
304 expr, range, step, ..
305 } = subquery_expr;
306
307 let current_interval = self.ctx.interval;
308 if let Some(step) = step {
309 self.ctx.interval = step.as_millis() as _;
310 }
311 let current_start = self.ctx.start;
312 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
313 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
314 self.ctx.interval = current_interval;
315 self.ctx.start = current_start;
316
317 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
318 let range_ms = range.as_millis() as _;
319 self.ctx.range = Some(range_ms);
320
321 let manipulate = RangeManipulate::new(
322 self.ctx.start,
323 self.ctx.end,
324 self.ctx.interval,
325 range_ms,
326 self.ctx
327 .time_index_column
328 .clone()
329 .expect("time index should be set in `setup_context`"),
330 self.ctx.field_columns.clone(),
331 input,
332 )
333 .context(DataFusionPlanningSnafu)?;
334
335 Ok(LogicalPlan::Extension(Extension {
336 node: Arc::new(manipulate),
337 }))
338 }
339
340 async fn prom_aggr_expr_to_plan(
341 &mut self,
342 query_engine_state: &QueryEngineState,
343 aggr_expr: &AggregateExpr,
344 ) -> Result<LogicalPlan> {
345 let AggregateExpr {
346 op,
347 expr,
348 modifier,
349 param,
350 } = aggr_expr;
351
352 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
353
354 match (*op).id() {
355 token::T_TOPK | token::T_BOTTOMK => {
356 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
357 }
358 _ => {
359 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
362 let (aggr_exprs, prev_field_exprs) =
364 self.create_aggregate_exprs(*op, param, &input)?;
365
366 let builder = LogicalPlanBuilder::from(input);
368 let builder = if op.id() == token::T_COUNT_VALUES {
369 let label = Self::get_param_value_as_str(*op, param)?;
370 group_exprs.extend(prev_field_exprs.clone());
373 let project_fields = self
374 .create_field_column_exprs()?
375 .into_iter()
376 .chain(self.create_tag_column_exprs()?)
377 .chain(Some(self.create_time_index_column_expr()?))
378 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
379
380 builder
381 .aggregate(group_exprs.clone(), aggr_exprs)
382 .context(DataFusionPlanningSnafu)?
383 .project(project_fields)
384 .context(DataFusionPlanningSnafu)?
385 } else {
386 builder
387 .aggregate(group_exprs.clone(), aggr_exprs)
388 .context(DataFusionPlanningSnafu)?
389 };
390
391 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
392
393 builder
394 .sort(sort_expr)
395 .context(DataFusionPlanningSnafu)?
396 .build()
397 .context(DataFusionPlanningSnafu)
398 }
399 }
400 }
401
402 async fn prom_topk_bottomk_to_plan(
404 &mut self,
405 aggr_expr: &AggregateExpr,
406 input: LogicalPlan,
407 ) -> Result<LogicalPlan> {
408 let AggregateExpr {
409 op,
410 param,
411 modifier,
412 ..
413 } = aggr_expr;
414
415 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
416
417 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
418
419 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
421
422 let rank_columns: Vec<_> = window_exprs
423 .iter()
424 .map(|expr| expr.schema_name().to_string())
425 .collect();
426
427 let filter: DfExpr = rank_columns
430 .iter()
431 .fold(None, |expr, rank| {
432 let predicate = DfExpr::BinaryExpr(BinaryExpr {
433 left: Box::new(col(rank)),
434 op: Operator::LtEq,
435 right: Box::new(val.clone()),
436 });
437
438 match expr {
439 None => Some(predicate),
440 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
441 left: Box::new(expr),
442 op: Operator::Or,
443 right: Box::new(predicate),
444 })),
445 }
446 })
447 .unwrap();
448
449 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
450
451 let mut new_group_exprs = group_exprs.clone();
452 new_group_exprs.extend(rank_columns);
454
455 let group_sort_expr = new_group_exprs
456 .into_iter()
457 .map(|expr| expr.sort(true, false));
458
459 let project_fields = self
460 .create_field_column_exprs()?
461 .into_iter()
462 .chain(self.create_tag_column_exprs()?)
463 .chain(Some(self.create_time_index_column_expr()?));
464
465 LogicalPlanBuilder::from(input)
466 .window(window_exprs)
467 .context(DataFusionPlanningSnafu)?
468 .filter(filter)
469 .context(DataFusionPlanningSnafu)?
470 .sort(group_sort_expr)
471 .context(DataFusionPlanningSnafu)?
472 .project(project_fields)
473 .context(DataFusionPlanningSnafu)?
474 .build()
475 .context(DataFusionPlanningSnafu)
476 }
477
478 async fn prom_unary_expr_to_plan(
479 &mut self,
480 query_engine_state: &QueryEngineState,
481 unary_expr: &UnaryExpr,
482 ) -> Result<LogicalPlan> {
483 let UnaryExpr { expr } = unary_expr;
484 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
486 self.projection_for_each_field_column(input, |col| {
487 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
488 })
489 }
490
491 async fn prom_binary_expr_to_plan(
492 &mut self,
493 query_engine_state: &QueryEngineState,
494 binary_expr: &PromBinaryExpr,
495 ) -> Result<LogicalPlan> {
496 let PromBinaryExpr {
497 lhs,
498 rhs,
499 op,
500 modifier,
501 } = binary_expr;
502
503 let should_return_bool = if let Some(m) = modifier {
506 m.return_bool
507 } else {
508 false
509 };
510 let is_comparison_op = Self::is_token_a_comparison_op(*op);
511
512 match (
515 Self::try_build_literal_expr(lhs),
516 Self::try_build_literal_expr(rhs),
517 ) {
518 (Some(lhs), Some(rhs)) => {
519 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
520 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
521 self.ctx.reset_table_name_and_schema();
522 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
523 let mut field_expr = field_expr_builder(lhs, rhs)?;
524
525 if is_comparison_op && should_return_bool {
526 field_expr = DfExpr::Cast(Cast {
527 expr: Box::new(field_expr),
528 data_type: ArrowDataType::Float64,
529 });
530 }
531
532 Ok(LogicalPlan::Extension(Extension {
533 node: Arc::new(
534 EmptyMetric::new(
535 self.ctx.start,
536 self.ctx.end,
537 self.ctx.interval,
538 SPECIAL_TIME_FUNCTION.to_string(),
539 DEFAULT_FIELD_COLUMN.to_string(),
540 Some(field_expr),
541 )
542 .context(DataFusionPlanningSnafu)?,
543 ),
544 }))
545 }
546 (Some(mut expr), None) => {
548 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
549 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
551 expr = time_expr
552 }
553 let bin_expr_builder = |col: &String| {
554 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
555 let mut binary_expr =
556 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
557
558 if is_comparison_op && should_return_bool {
559 binary_expr = DfExpr::Cast(Cast {
560 expr: Box::new(binary_expr),
561 data_type: ArrowDataType::Float64,
562 });
563 }
564 Ok(binary_expr)
565 };
566 if is_comparison_op && !should_return_bool {
567 self.filter_on_field_column(input, bin_expr_builder)
568 } else {
569 self.projection_for_each_field_column(input, bin_expr_builder)
570 }
571 }
572 (None, Some(mut expr)) => {
574 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
575 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
577 expr = time_expr
578 }
579 let bin_expr_builder = |col: &String| {
580 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
581 let mut binary_expr =
582 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
583
584 if is_comparison_op && should_return_bool {
585 binary_expr = DfExpr::Cast(Cast {
586 expr: Box::new(binary_expr),
587 data_type: ArrowDataType::Float64,
588 });
589 }
590 Ok(binary_expr)
591 };
592 if is_comparison_op && !should_return_bool {
593 self.filter_on_field_column(input, bin_expr_builder)
594 } else {
595 self.projection_for_each_field_column(input, bin_expr_builder)
596 }
597 }
598 (None, None) => {
600 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
601 let left_field_columns = self.ctx.field_columns.clone();
602 let left_time_index_column = self.ctx.time_index_column.clone();
603 let mut left_table_ref = self
604 .table_ref()
605 .unwrap_or_else(|_| TableReference::bare(""));
606 let left_context = self.ctx.clone();
607
608 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
609 let right_field_columns = self.ctx.field_columns.clone();
610 let right_time_index_column = self.ctx.time_index_column.clone();
611 let mut right_table_ref = self
612 .table_ref()
613 .unwrap_or_else(|_| TableReference::bare(""));
614 let right_context = self.ctx.clone();
615
616 if Self::is_token_a_set_op(*op) {
620 return self.set_op_on_non_field_columns(
621 left_input,
622 right_input,
623 left_context,
624 right_context,
625 *op,
626 modifier,
627 );
628 }
629
630 if left_table_ref == right_table_ref {
632 left_table_ref = TableReference::bare("lhs");
634 right_table_ref = TableReference::bare("rhs");
635 if self.ctx.tag_columns.is_empty() {
641 self.ctx = left_context.clone();
642 self.ctx.table_name = Some("lhs".to_string());
643 } else {
644 self.ctx.table_name = Some("rhs".to_string());
645 }
646 }
647 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
648
649 let join_plan = self.join_on_non_field_columns(
650 left_input,
651 right_input,
652 left_table_ref.clone(),
653 right_table_ref.clone(),
654 left_time_index_column,
655 right_time_index_column,
656 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
659 modifier,
660 )?;
661 let join_plan_schema = join_plan.schema().clone();
662
663 let bin_expr_builder = |_: &String| {
664 let (left_col_name, right_col_name) = field_columns.next().unwrap();
665 let left_col = join_plan_schema
666 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
667 .context(DataFusionPlanningSnafu)?
668 .into();
669 let right_col = join_plan_schema
670 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
671 .context(DataFusionPlanningSnafu)?
672 .into();
673
674 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
675 let mut binary_expr =
676 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
677 if is_comparison_op && should_return_bool {
678 binary_expr = DfExpr::Cast(Cast {
679 expr: Box::new(binary_expr),
680 data_type: ArrowDataType::Float64,
681 });
682 }
683 Ok(binary_expr)
684 };
685 if is_comparison_op && !should_return_bool {
686 self.filter_on_field_column(join_plan, bin_expr_builder)
687 } else {
688 self.projection_for_each_field_column(join_plan, bin_expr_builder)
689 }
690 }
691 }
692 }
693
694 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
695 let NumberLiteral { val } = number_literal;
696 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
697 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
698 self.ctx.reset_table_name_and_schema();
699 let literal_expr = df_prelude::lit(*val);
700
701 let plan = LogicalPlan::Extension(Extension {
702 node: Arc::new(
703 EmptyMetric::new(
704 self.ctx.start,
705 self.ctx.end,
706 self.ctx.interval,
707 SPECIAL_TIME_FUNCTION.to_string(),
708 DEFAULT_FIELD_COLUMN.to_string(),
709 Some(literal_expr),
710 )
711 .context(DataFusionPlanningSnafu)?,
712 ),
713 });
714 Ok(plan)
715 }
716
717 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
718 let StringLiteral { val } = string_literal;
719 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
720 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
721 self.ctx.reset_table_name_and_schema();
722 let literal_expr = df_prelude::lit(val.clone());
723
724 let plan = LogicalPlan::Extension(Extension {
725 node: Arc::new(
726 EmptyMetric::new(
727 self.ctx.start,
728 self.ctx.end,
729 self.ctx.interval,
730 SPECIAL_TIME_FUNCTION.to_string(),
731 DEFAULT_FIELD_COLUMN.to_string(),
732 Some(literal_expr),
733 )
734 .context(DataFusionPlanningSnafu)?,
735 ),
736 });
737 Ok(plan)
738 }
739
740 async fn prom_vector_selector_to_plan(
741 &mut self,
742 vector_selector: &VectorSelector,
743 timestamp_fn: bool,
744 ) -> Result<LogicalPlan> {
745 let VectorSelector {
746 name,
747 offset,
748 matchers,
749 at: _,
750 } = vector_selector;
751 let matchers = self.preprocess_label_matchers(matchers, name)?;
752 if let Some(empty_plan) = self.setup_context().await? {
753 return Ok(empty_plan);
754 }
755 let normalize = self
756 .selector_to_series_normalize_plan(offset, matchers, false)
757 .await?;
758
759 let normalize = if timestamp_fn {
760 self.create_timestamp_func_plan(normalize)?
763 } else {
764 normalize
765 };
766
767 let manipulate = InstantManipulate::new(
768 self.ctx.start,
769 self.ctx.end,
770 self.ctx.lookback_delta,
771 self.ctx.interval,
772 self.ctx
773 .time_index_column
774 .clone()
775 .expect("time index should be set in `setup_context`"),
776 self.ctx.field_columns.first().cloned(),
777 normalize,
778 );
779 Ok(LogicalPlan::Extension(Extension {
780 node: Arc::new(manipulate),
781 }))
782 }
783
784 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
806 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
807 .alias(DEFAULT_FIELD_COLUMN);
808 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
809 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
810 project_exprs.push(self.create_time_index_column_expr()?);
811 project_exprs.push(time_expr);
812 project_exprs.extend(self.create_tag_column_exprs()?);
813
814 LogicalPlanBuilder::from(normalize)
815 .project(project_exprs)
816 .context(DataFusionPlanningSnafu)?
817 .build()
818 .context(DataFusionPlanningSnafu)
819 }
820
821 async fn prom_matrix_selector_to_plan(
822 &mut self,
823 matrix_selector: &MatrixSelector,
824 ) -> Result<LogicalPlan> {
825 let MatrixSelector { vs, range } = matrix_selector;
826 let VectorSelector {
827 name,
828 offset,
829 matchers,
830 ..
831 } = vs;
832 let matchers = self.preprocess_label_matchers(matchers, name)?;
833 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
834 let range_ms = range.as_millis() as _;
835 self.ctx.range = Some(range_ms);
836
837 let normalize = match self.setup_context().await? {
840 Some(empty_plan) => empty_plan,
841 None => {
842 self.selector_to_series_normalize_plan(offset, matchers, true)
843 .await?
844 }
845 };
846 let manipulate = RangeManipulate::new(
847 self.ctx.start,
848 self.ctx.end,
849 self.ctx.interval,
850 range_ms,
852 self.ctx
853 .time_index_column
854 .clone()
855 .expect("time index should be set in `setup_context`"),
856 self.ctx.field_columns.clone(),
857 normalize,
858 )
859 .context(DataFusionPlanningSnafu)?;
860
861 Ok(LogicalPlan::Extension(Extension {
862 node: Arc::new(manipulate),
863 }))
864 }
865
866 async fn prom_call_expr_to_plan(
867 &mut self,
868 query_engine_state: &QueryEngineState,
869 call_expr: &Call,
870 ) -> Result<LogicalPlan> {
871 let Call { func, args } = call_expr;
872 match func.name {
874 SPECIAL_HISTOGRAM_QUANTILE => {
875 return self.create_histogram_plan(args, query_engine_state).await;
876 }
877 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
878 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
879 SPECIAL_ABSENT_FUNCTION => {
880 return self.create_absent_plan(args, query_engine_state).await;
881 }
882 _ => {}
883 }
884
885 let args = self.create_function_args(&args.args)?;
887 let input = if let Some(prom_expr) = &args.input {
888 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
889 .await?
890 } else {
891 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
892 self.ctx.reset_table_name_and_schema();
893 self.ctx.tag_columns = vec![];
894 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
895 LogicalPlan::Extension(Extension {
896 node: Arc::new(
897 EmptyMetric::new(
898 self.ctx.start,
899 self.ctx.end,
900 self.ctx.interval,
901 SPECIAL_TIME_FUNCTION.to_string(),
902 DEFAULT_FIELD_COLUMN.to_string(),
903 None,
904 )
905 .context(DataFusionPlanningSnafu)?,
906 ),
907 })
908 };
909 let (mut func_exprs, new_tags) =
910 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
911 func_exprs.insert(0, self.create_time_index_column_expr()?);
912 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
913
914 let builder = LogicalPlanBuilder::from(input)
915 .project(func_exprs)
916 .context(DataFusionPlanningSnafu)?
917 .filter(self.create_empty_values_filter_expr()?)
918 .context(DataFusionPlanningSnafu)?;
919
920 let builder = match func.name {
921 "sort" => builder
922 .sort(self.create_field_columns_sort_exprs(true))
923 .context(DataFusionPlanningSnafu)?,
924 "sort_desc" => builder
925 .sort(self.create_field_columns_sort_exprs(false))
926 .context(DataFusionPlanningSnafu)?,
927 "sort_by_label" => builder
928 .sort(Self::create_sort_exprs_by_tags(
929 func.name,
930 args.literals,
931 true,
932 )?)
933 .context(DataFusionPlanningSnafu)?,
934 "sort_by_label_desc" => builder
935 .sort(Self::create_sort_exprs_by_tags(
936 func.name,
937 args.literals,
938 false,
939 )?)
940 .context(DataFusionPlanningSnafu)?,
941
942 _ => builder,
943 };
944
945 for tag in new_tags {
948 self.ctx.tag_columns.push(tag);
949 }
950
951 let plan = builder.build().context(DataFusionPlanningSnafu)?;
952 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
953
954 Ok(plan)
955 }
956
957 async fn prom_ext_expr_to_plan(
958 &mut self,
959 query_engine_state: &QueryEngineState,
960 ext_expr: &promql_parser::parser::ast::Extension,
961 ) -> Result<LogicalPlan> {
962 let expr = &ext_expr.expr;
964 let children = expr.children();
965 let plan = self
966 .prom_expr_to_plan(&children[0], query_engine_state)
967 .await?;
968 match expr.name() {
974 "ANALYZE" => LogicalPlanBuilder::from(plan)
975 .explain(false, true)
976 .unwrap()
977 .build()
978 .context(DataFusionPlanningSnafu),
979 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
980 .explain(true, true)
981 .unwrap()
982 .build()
983 .context(DataFusionPlanningSnafu),
984 "EXPLAIN" => LogicalPlanBuilder::from(plan)
985 .explain(false, false)
986 .unwrap()
987 .build()
988 .context(DataFusionPlanningSnafu),
989 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
990 .explain(true, false)
991 .unwrap()
992 .build()
993 .context(DataFusionPlanningSnafu),
994 _ => LogicalPlanBuilder::empty(true)
995 .build()
996 .context(DataFusionPlanningSnafu),
997 }
998 }
999
1000 #[allow(clippy::mutable_key_type)]
1010 fn preprocess_label_matchers(
1011 &mut self,
1012 label_matchers: &Matchers,
1013 name: &Option<String>,
1014 ) -> Result<Matchers> {
1015 self.ctx.reset();
1016
1017 let metric_name;
1018 if let Some(name) = name.clone() {
1019 metric_name = Some(name);
1020 ensure!(
1021 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1022 MultipleMetricMatchersSnafu
1023 );
1024 } else {
1025 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1026 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1027 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1028 ensure!(
1029 matches[0].op == MatchOp::Equal,
1030 UnsupportedMatcherOpSnafu {
1031 matcher_op: matches[0].op.to_string(),
1032 matcher: METRIC_NAME
1033 }
1034 );
1035 metric_name = matches.pop().map(|m| m.value);
1036 }
1037
1038 self.ctx.table_name = metric_name;
1039
1040 let mut matchers = HashSet::new();
1041 for matcher in &label_matchers.matchers {
1042 if matcher.name == FIELD_COLUMN_MATCHER {
1044 self.ctx
1045 .field_column_matcher
1046 .get_or_insert_default()
1047 .push(matcher.clone());
1048 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1049 ensure!(
1050 matcher.op == MatchOp::Equal,
1051 UnsupportedMatcherOpSnafu {
1052 matcher: matcher.name.clone(),
1053 matcher_op: matcher.op.to_string(),
1054 }
1055 );
1056 self.ctx.schema_name = Some(matcher.value.clone());
1057 } else if matcher.name != METRIC_NAME {
1058 self.ctx.selector_matcher.push(matcher.clone());
1059 let _ = matchers.insert(matcher.clone());
1060 }
1061 }
1062
1063 Ok(Matchers::new(
1064 matchers.into_iter().map(normalize_matcher).collect(),
1065 ))
1066 }
1067
1068 async fn selector_to_series_normalize_plan(
1069 &mut self,
1070 offset: &Option<Offset>,
1071 label_matchers: Matchers,
1072 is_range_selector: bool,
1073 ) -> Result<LogicalPlan> {
1074 let table_ref = self.table_ref()?;
1076 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1077 let table_schema = table_scan.schema();
1078
1079 let offset_duration = match offset {
1081 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1082 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1083 None => 0,
1084 };
1085 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1086 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1087 scan_filters.push(time_index_filter);
1088 }
1089 table_scan = LogicalPlanBuilder::from(table_scan)
1090 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1092 .build()
1093 .context(DataFusionPlanningSnafu)?;
1094
1095 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1097 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1098 let mut result_set = HashSet::new();
1100 let mut reverse_set = HashSet::new();
1102 for matcher in field_matchers {
1103 match &matcher.op {
1104 MatchOp::Equal => {
1105 if col_set.contains(&matcher.value) {
1106 let _ = result_set.insert(matcher.value.clone());
1107 } else {
1108 return Err(ColumnNotFoundSnafu {
1109 col: matcher.value.clone(),
1110 }
1111 .build());
1112 }
1113 }
1114 MatchOp::NotEqual => {
1115 if col_set.contains(&matcher.value) {
1116 let _ = reverse_set.insert(matcher.value.clone());
1117 } else {
1118 return Err(ColumnNotFoundSnafu {
1119 col: matcher.value.clone(),
1120 }
1121 .build());
1122 }
1123 }
1124 MatchOp::Re(regex) => {
1125 for col in &self.ctx.field_columns {
1126 if regex.is_match(col) {
1127 let _ = result_set.insert(col.clone());
1128 }
1129 }
1130 }
1131 MatchOp::NotRe(regex) => {
1132 for col in &self.ctx.field_columns {
1133 if regex.is_match(col) {
1134 let _ = reverse_set.insert(col.clone());
1135 }
1136 }
1137 }
1138 }
1139 }
1140 if result_set.is_empty() {
1142 result_set = col_set.into_iter().cloned().collect();
1143 }
1144 for col in reverse_set {
1145 let _ = result_set.remove(&col);
1146 }
1147
1148 self.ctx.field_columns = self
1150 .ctx
1151 .field_columns
1152 .drain(..)
1153 .filter(|col| result_set.contains(col))
1154 .collect();
1155
1156 let exprs = result_set
1157 .into_iter()
1158 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1159 .chain(self.create_tag_column_exprs()?)
1160 .chain(Some(self.create_time_index_column_expr()?))
1161 .collect::<Vec<_>>();
1162
1163 table_scan = LogicalPlanBuilder::from(table_scan)
1165 .project(exprs)
1166 .context(DataFusionPlanningSnafu)?
1167 .build()
1168 .context(DataFusionPlanningSnafu)?;
1169 }
1170
1171 let sort_plan = LogicalPlanBuilder::from(table_scan)
1173 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1174 .context(DataFusionPlanningSnafu)?
1175 .build()
1176 .context(DataFusionPlanningSnafu)?;
1177
1178 let time_index_column =
1180 self.ctx
1181 .time_index_column
1182 .clone()
1183 .with_context(|| TimeIndexNotFoundSnafu {
1184 table: table_ref.to_string(),
1185 })?;
1186 let divide_plan = LogicalPlan::Extension(Extension {
1187 node: Arc::new(SeriesDivide::new(
1188 self.ctx.tag_columns.clone(),
1189 time_index_column,
1190 sort_plan,
1191 )),
1192 });
1193
1194 if !is_range_selector && offset_duration == 0 {
1196 return Ok(divide_plan);
1197 }
1198 let series_normalize = SeriesNormalize::new(
1199 offset_duration,
1200 self.ctx
1201 .time_index_column
1202 .clone()
1203 .with_context(|| TimeIndexNotFoundSnafu {
1204 table: table_ref.to_quoted_string(),
1205 })?,
1206 is_range_selector,
1207 self.ctx.tag_columns.clone(),
1208 divide_plan,
1209 );
1210 let logical_plan = LogicalPlan::Extension(Extension {
1211 node: Arc::new(series_normalize),
1212 });
1213
1214 Ok(logical_plan)
1215 }
1216
1217 fn agg_modifier_to_col(
1224 &mut self,
1225 input_schema: &DFSchemaRef,
1226 modifier: &Option<LabelModifier>,
1227 update_ctx: bool,
1228 ) -> Result<Vec<DfExpr>> {
1229 match modifier {
1230 None => {
1231 if update_ctx {
1232 self.ctx.tag_columns.clear();
1233 }
1234 Ok(vec![self.create_time_index_column_expr()?])
1235 }
1236 Some(LabelModifier::Include(labels)) => {
1237 if update_ctx {
1238 self.ctx.tag_columns.clear();
1239 }
1240 let mut exprs = Vec::with_capacity(labels.labels.len());
1241 for label in &labels.labels {
1242 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1244 {
1245 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1246
1247 if update_ctx {
1248 self.ctx.tag_columns.push(column_name);
1250 }
1251 }
1252 }
1253 exprs.push(self.create_time_index_column_expr()?);
1255
1256 Ok(exprs)
1257 }
1258 Some(LabelModifier::Exclude(labels)) => {
1259 let mut all_fields = input_schema
1260 .fields()
1261 .iter()
1262 .map(|f| f.name())
1263 .collect::<BTreeSet<_>>();
1264
1265 for label in &labels.labels {
1268 let _ = all_fields.remove(label);
1269 }
1270
1271 if let Some(time_index) = &self.ctx.time_index_column {
1273 let _ = all_fields.remove(time_index);
1274 }
1275 for value in &self.ctx.field_columns {
1276 let _ = all_fields.remove(value);
1277 }
1278
1279 if update_ctx {
1280 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1282 }
1283
1284 let mut exprs = all_fields
1286 .into_iter()
1287 .map(|c| DfExpr::Column(Column::from(c)))
1288 .collect::<Vec<_>>();
1289
1290 exprs.push(self.create_time_index_column_expr()?);
1292
1293 Ok(exprs)
1294 }
1295 }
1296 }
1297
1298 pub fn matchers_to_expr(
1300 label_matchers: Matchers,
1301 table_schema: &DFSchemaRef,
1302 ) -> Result<Vec<DfExpr>> {
1303 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1304 for matcher in label_matchers.matchers {
1305 if matcher.name == SCHEMA_COLUMN_MATCHER
1306 || matcher.name == DB_COLUMN_MATCHER
1307 || matcher.name == FIELD_COLUMN_MATCHER
1308 {
1309 continue;
1310 }
1311
1312 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1313 let col = if let Some(column_name) = column_name {
1314 DfExpr::Column(Column::from_name(column_name))
1315 } else {
1316 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1317 .alias(matcher.name.clone())
1318 };
1319 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1320 let expr = match matcher.op {
1321 MatchOp::Equal => col.eq(lit),
1322 MatchOp::NotEqual => col.not_eq(lit),
1323 MatchOp::Re(re) => {
1324 if re.as_str() == "^(?:.*)$" {
1330 continue;
1331 }
1332 if re.as_str() == "^(?:.+)$" {
1333 col.not_eq(DfExpr::Literal(
1334 ScalarValue::Utf8(Some(String::new())),
1335 None,
1336 ))
1337 } else {
1338 DfExpr::BinaryExpr(BinaryExpr {
1339 left: Box::new(col),
1340 op: Operator::RegexMatch,
1341 right: Box::new(DfExpr::Literal(
1342 ScalarValue::Utf8(Some(re.as_str().to_string())),
1343 None,
1344 )),
1345 })
1346 }
1347 }
1348 MatchOp::NotRe(re) => {
1349 if re.as_str() == "^(?:.*)$" {
1350 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1351 } else if re.as_str() == "^(?:.+)$" {
1352 col.eq(DfExpr::Literal(
1353 ScalarValue::Utf8(Some(String::new())),
1354 None,
1355 ))
1356 } else {
1357 DfExpr::BinaryExpr(BinaryExpr {
1358 left: Box::new(col),
1359 op: Operator::RegexNotMatch,
1360 right: Box::new(DfExpr::Literal(
1361 ScalarValue::Utf8(Some(re.as_str().to_string())),
1362 None,
1363 )),
1364 })
1365 }
1366 }
1367 };
1368 exprs.push(expr);
1369 }
1370
1371 Ok(exprs)
1372 }
1373
1374 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1375 schema
1376 .fields()
1377 .iter()
1378 .find(|field| field.name() == column)
1379 .map(|field| field.name().clone())
1380 }
1381
1382 fn table_ref(&self) -> Result<TableReference> {
1383 let table_name = self
1384 .ctx
1385 .table_name
1386 .clone()
1387 .context(TableNameNotFoundSnafu)?;
1388
1389 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1391 TableReference::partial(schema_name.as_str(), table_name.as_str())
1392 } else {
1393 TableReference::bare(table_name.as_str())
1394 };
1395
1396 Ok(table_ref)
1397 }
1398
1399 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1400 let start = self.ctx.start;
1401 let end = self.ctx.end;
1402 if end < start {
1403 return InvalidTimeRangeSnafu { start, end }.fail();
1404 }
1405 let lookback_delta = self.ctx.lookback_delta;
1406 let range = self.ctx.range.unwrap_or_default();
1407 let interval = self.ctx.interval;
1408 let time_index_expr = self.create_time_index_column_expr()?;
1409 let num_points = (end - start) / interval;
1410
1411 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1413 let single_time_range = time_index_expr
1414 .clone()
1415 .gt_eq(DfExpr::Literal(
1416 ScalarValue::TimestampMillisecond(
1417 Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1418 None,
1419 ),
1420 None,
1421 ))
1422 .and(time_index_expr.lt_eq(DfExpr::Literal(
1423 ScalarValue::TimestampMillisecond(
1424 Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1425 None,
1426 ),
1427 None,
1428 )));
1429 return Ok(Some(single_time_range));
1430 }
1431
1432 let mut filters = Vec::with_capacity(num_points as usize);
1434 for timestamp in (start..end).step_by(interval as usize) {
1435 filters.push(
1436 time_index_expr
1437 .clone()
1438 .gt_eq(DfExpr::Literal(
1439 ScalarValue::TimestampMillisecond(
1440 Some(timestamp + offset_duration - lookback_delta - range),
1441 None,
1442 ),
1443 None,
1444 ))
1445 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1446 ScalarValue::TimestampMillisecond(
1447 Some(timestamp + offset_duration + lookback_delta),
1448 None,
1449 ),
1450 None,
1451 ))),
1452 )
1453 }
1454
1455 Ok(filters.into_iter().reduce(DfExpr::or))
1456 }
1457
1458 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1463 let provider = self
1464 .table_provider
1465 .resolve_table(table_ref.clone())
1466 .await
1467 .context(CatalogSnafu)?;
1468
1469 let is_time_index_ms = provider
1470 .as_any()
1471 .downcast_ref::<DefaultTableSource>()
1472 .context(UnknownTableSnafu)?
1473 .table_provider
1474 .as_any()
1475 .downcast_ref::<DfTableProviderAdapter>()
1476 .context(UnknownTableSnafu)?
1477 .table()
1478 .schema()
1479 .timestamp_column()
1480 .with_context(|| TimeIndexNotFoundSnafu {
1481 table: table_ref.to_quoted_string(),
1482 })?
1483 .data_type
1484 == ConcreteDataType::timestamp_millisecond_datatype();
1485
1486 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1487 .context(DataFusionPlanningSnafu)?
1488 .build()
1489 .context(DataFusionPlanningSnafu)?;
1490
1491 if !is_time_index_ms {
1492 let expr: Vec<_> = self
1494 .ctx
1495 .field_columns
1496 .iter()
1497 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1498 .chain(self.create_tag_column_exprs()?)
1499 .chain(Some(DfExpr::Alias(Alias {
1500 expr: Box::new(DfExpr::Cast(Cast {
1501 expr: Box::new(self.create_time_index_column_expr()?),
1502 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1503 })),
1504 relation: Some(table_ref.clone()),
1505 name: self
1506 .ctx
1507 .time_index_column
1508 .as_ref()
1509 .with_context(|| TimeIndexNotFoundSnafu {
1510 table: table_ref.to_quoted_string(),
1511 })?
1512 .clone(),
1513 metadata: None,
1514 })))
1515 .collect::<Vec<_>>();
1516 scan_plan = LogicalPlanBuilder::from(scan_plan)
1517 .project(expr)
1518 .context(DataFusionPlanningSnafu)?
1519 .build()
1520 .context(DataFusionPlanningSnafu)?;
1521 }
1522
1523 let result = LogicalPlanBuilder::from(scan_plan)
1524 .build()
1525 .context(DataFusionPlanningSnafu)?;
1526 Ok(result)
1527 }
1528
1529 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1533 let table_ref = self.table_ref()?;
1534 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1535 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1536 let plan = self.setup_context_for_empty_metric()?;
1537 return Ok(Some(plan));
1538 }
1539 res => res.context(CatalogSnafu)?,
1540 };
1541 let table = table
1542 .as_any()
1543 .downcast_ref::<DefaultTableSource>()
1544 .context(UnknownTableSnafu)?
1545 .table_provider
1546 .as_any()
1547 .downcast_ref::<DfTableProviderAdapter>()
1548 .context(UnknownTableSnafu)?
1549 .table();
1550
1551 let time_index = table
1553 .schema()
1554 .timestamp_column()
1555 .with_context(|| TimeIndexNotFoundSnafu {
1556 table: table_ref.to_quoted_string(),
1557 })?
1558 .name
1559 .clone();
1560 self.ctx.time_index_column = Some(time_index);
1561
1562 let values = table
1564 .table_info()
1565 .meta
1566 .field_column_names()
1567 .cloned()
1568 .collect();
1569 self.ctx.field_columns = values;
1570
1571 let tags = table
1573 .table_info()
1574 .meta
1575 .row_key_column_names()
1576 .filter(|col| {
1577 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1579 })
1580 .cloned()
1581 .collect();
1582 self.ctx.tag_columns = tags;
1583
1584 Ok(None)
1585 }
1586
1587 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1590 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1591 self.ctx.reset_table_name_and_schema();
1592 self.ctx.tag_columns = vec![];
1593 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1594
1595 let plan = LogicalPlan::Extension(Extension {
1597 node: Arc::new(
1598 EmptyMetric::new(
1599 0,
1600 -1,
1601 self.ctx.interval,
1602 SPECIAL_TIME_FUNCTION.to_string(),
1603 DEFAULT_FIELD_COLUMN.to_string(),
1604 Some(lit(0.0f64)),
1605 )
1606 .context(DataFusionPlanningSnafu)?,
1607 ),
1608 });
1609 Ok(plan)
1610 }
1611
1612 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1614 let mut result = FunctionArgs::default();
1615
1616 for arg in args {
1617 if let Some(expr) = Self::try_build_literal_expr(arg) {
1619 result.literals.push(expr);
1620 } else {
1621 match arg.as_ref() {
1623 PromExpr::Subquery(_)
1624 | PromExpr::VectorSelector(_)
1625 | PromExpr::MatrixSelector(_)
1626 | PromExpr::Extension(_)
1627 | PromExpr::Aggregate(_)
1628 | PromExpr::Paren(_)
1629 | PromExpr::Call(_)
1630 | PromExpr::Binary(_)
1631 | PromExpr::Unary(_) => {
1632 if result.input.replace(*arg.clone()).is_some() {
1633 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1634 }
1635 }
1636
1637 _ => {
1638 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1639 result.literals.push(expr);
1640 }
1641 }
1642 }
1643 }
1644
1645 Ok(result)
1646 }
1647
1648 fn create_function_expr(
1654 &mut self,
1655 func: &Function,
1656 other_input_exprs: Vec<DfExpr>,
1657 query_engine_state: &QueryEngineState,
1658 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1659 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1661
1662 let field_column_pos = 0;
1664 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1665 let mut new_tags = vec![];
1667 let scalar_func = match func.name {
1668 "increase" => ScalarFunc::ExtrapolateUdf(
1669 Arc::new(Increase::scalar_udf()),
1670 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1671 ),
1672 "rate" => ScalarFunc::ExtrapolateUdf(
1673 Arc::new(Rate::scalar_udf()),
1674 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1675 ),
1676 "delta" => ScalarFunc::ExtrapolateUdf(
1677 Arc::new(Delta::scalar_udf()),
1678 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1679 ),
1680 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1681 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1682 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1683 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1684 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1685 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1686 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1687 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1688 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1689 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1690 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1691 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1692 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1693 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1694 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1695 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1696 "predict_linear" => {
1697 other_input_exprs[0] = DfExpr::Cast(Cast {
1698 expr: Box::new(other_input_exprs[0].clone()),
1699 data_type: ArrowDataType::Int64,
1700 });
1701 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1702 }
1703 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1704 "time" => {
1705 exprs.push(build_special_time_expr(
1706 self.ctx.time_index_column.as_ref().unwrap(),
1707 ));
1708 ScalarFunc::GeneratedExpr
1709 }
1710 "minute" => {
1711 let expr = self.date_part_on_time_index("minute")?;
1713 exprs.push(expr);
1714 ScalarFunc::GeneratedExpr
1715 }
1716 "hour" => {
1717 let expr = self.date_part_on_time_index("hour")?;
1719 exprs.push(expr);
1720 ScalarFunc::GeneratedExpr
1721 }
1722 "month" => {
1723 let expr = self.date_part_on_time_index("month")?;
1725 exprs.push(expr);
1726 ScalarFunc::GeneratedExpr
1727 }
1728 "year" => {
1729 let expr = self.date_part_on_time_index("year")?;
1731 exprs.push(expr);
1732 ScalarFunc::GeneratedExpr
1733 }
1734 "day_of_month" => {
1735 let expr = self.date_part_on_time_index("day")?;
1737 exprs.push(expr);
1738 ScalarFunc::GeneratedExpr
1739 }
1740 "day_of_week" => {
1741 let expr = self.date_part_on_time_index("dow")?;
1743 exprs.push(expr);
1744 ScalarFunc::GeneratedExpr
1745 }
1746 "day_of_year" => {
1747 let expr = self.date_part_on_time_index("doy")?;
1749 exprs.push(expr);
1750 ScalarFunc::GeneratedExpr
1751 }
1752 "days_in_month" => {
1753 let day_lit_expr = "day".lit();
1758 let month_lit_expr = "month".lit();
1759 let interval_1month_lit_expr =
1760 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1761 let interval_1day_lit_expr = DfExpr::Literal(
1762 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1763 None,
1764 );
1765 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1766 left: Box::new(interval_1month_lit_expr),
1767 op: Operator::Minus,
1768 right: Box::new(interval_1day_lit_expr),
1769 });
1770 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1771 func: datafusion_functions::datetime::date_trunc(),
1772 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1773 });
1774 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1775 left: Box::new(date_trunc_expr),
1776 op: Operator::Plus,
1777 right: Box::new(the_1month_minus_1day_expr),
1778 });
1779 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1780 func: datafusion_functions::datetime::date_part(),
1781 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1782 });
1783
1784 exprs.push(date_part_expr);
1785 ScalarFunc::GeneratedExpr
1786 }
1787
1788 "label_join" => {
1789 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1790 &mut other_input_exprs,
1791 &self.ctx,
1792 query_engine_state,
1793 )?;
1794
1795 for value in &self.ctx.field_columns {
1797 if *value != dst_label {
1798 let expr = DfExpr::Column(Column::from_name(value));
1799 exprs.push(expr);
1800 }
1801 }
1802
1803 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1805 new_tags.push(dst_label);
1806 exprs.push(concat_expr);
1808
1809 ScalarFunc::GeneratedExpr
1810 }
1811 "label_replace" => {
1812 if let Some((replace_expr, dst_label)) = self
1813 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1814 {
1815 for value in &self.ctx.field_columns {
1817 if *value != dst_label {
1818 let expr = DfExpr::Column(Column::from_name(value));
1819 exprs.push(expr);
1820 }
1821 }
1822
1823 ensure!(
1824 !self.ctx.tag_columns.contains(&dst_label),
1825 SameLabelSetSnafu
1826 );
1827 new_tags.push(dst_label);
1828 exprs.push(replace_expr);
1830 } else {
1831 for value in &self.ctx.field_columns {
1833 let expr = DfExpr::Column(Column::from_name(value));
1834 exprs.push(expr);
1835 }
1836 }
1837
1838 ScalarFunc::GeneratedExpr
1839 }
1840 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1841 for value in &self.ctx.field_columns {
1844 let expr = DfExpr::Column(Column::from_name(value));
1845 exprs.push(expr);
1846 }
1847
1848 ScalarFunc::GeneratedExpr
1849 }
1850 "round" => {
1851 if other_input_exprs.is_empty() {
1852 other_input_exprs.push_front(0.0f64.lit());
1853 }
1854 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1855 }
1856 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1857 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1858 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1859 "pi" => {
1860 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1862 func: datafusion::functions::math::pi(),
1863 args: vec![],
1864 });
1865 exprs.push(fn_expr);
1866
1867 ScalarFunc::GeneratedExpr
1868 }
1869 _ => {
1870 if let Some(f) = query_engine_state
1871 .session_state()
1872 .scalar_functions()
1873 .get(func.name)
1874 {
1875 ScalarFunc::DataFusionBuiltin(f.clone())
1876 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1877 let func_state = query_engine_state.function_state();
1878 let query_ctx = self.table_provider.query_ctx();
1879
1880 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1881 state: func_state,
1882 query_ctx: query_ctx.clone(),
1883 })))
1884 } else if let Some(f) = datafusion_functions::math::functions()
1885 .iter()
1886 .find(|f| f.name() == func.name)
1887 {
1888 ScalarFunc::DataFusionUdf(f.clone())
1889 } else {
1890 return UnsupportedExprSnafu {
1891 name: func.name.to_string(),
1892 }
1893 .fail();
1894 }
1895 }
1896 };
1897
1898 for value in &self.ctx.field_columns {
1899 let col_expr = DfExpr::Column(Column::from_name(value));
1900
1901 match scalar_func.clone() {
1902 ScalarFunc::DataFusionBuiltin(func) => {
1903 other_input_exprs.insert(field_column_pos, col_expr);
1904 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1905 func,
1906 args: other_input_exprs.clone().into(),
1907 });
1908 exprs.push(fn_expr);
1909 let _ = other_input_exprs.remove(field_column_pos);
1910 }
1911 ScalarFunc::DataFusionUdf(func) => {
1912 let args = itertools::chain!(
1913 other_input_exprs.iter().take(field_column_pos).cloned(),
1914 std::iter::once(col_expr),
1915 other_input_exprs.iter().skip(field_column_pos).cloned()
1916 )
1917 .collect_vec();
1918 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1919 }
1920 ScalarFunc::Udf(func) => {
1921 let ts_range_expr = DfExpr::Column(Column::from_name(
1922 RangeManipulate::build_timestamp_range_name(
1923 self.ctx.time_index_column.as_ref().unwrap(),
1924 ),
1925 ));
1926 other_input_exprs.insert(field_column_pos, ts_range_expr);
1927 other_input_exprs.insert(field_column_pos + 1, col_expr);
1928 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1929 func,
1930 args: other_input_exprs.clone().into(),
1931 });
1932 exprs.push(fn_expr);
1933 let _ = other_input_exprs.remove(field_column_pos + 1);
1934 let _ = other_input_exprs.remove(field_column_pos);
1935 }
1936 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1937 let ts_range_expr = DfExpr::Column(Column::from_name(
1938 RangeManipulate::build_timestamp_range_name(
1939 self.ctx.time_index_column.as_ref().unwrap(),
1940 ),
1941 ));
1942 other_input_exprs.insert(field_column_pos, ts_range_expr);
1943 other_input_exprs.insert(field_column_pos + 1, col_expr);
1944 other_input_exprs
1945 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1946 other_input_exprs.push_back(lit(range_length));
1947 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1948 func,
1949 args: other_input_exprs.clone().into(),
1950 });
1951 exprs.push(fn_expr);
1952 let _ = other_input_exprs.pop_back();
1953 let _ = other_input_exprs.remove(field_column_pos + 2);
1954 let _ = other_input_exprs.remove(field_column_pos + 1);
1955 let _ = other_input_exprs.remove(field_column_pos);
1956 }
1957 ScalarFunc::GeneratedExpr => {}
1958 }
1959 }
1960
1961 if !matches!(func.name, "label_join" | "label_replace") {
1965 let mut new_field_columns = Vec::with_capacity(exprs.len());
1966
1967 exprs = exprs
1968 .into_iter()
1969 .map(|expr| {
1970 let display_name = expr.schema_name().to_string();
1971 new_field_columns.push(display_name.clone());
1972 Ok(expr.alias(display_name))
1973 })
1974 .collect::<std::result::Result<Vec<_>, _>>()
1975 .context(DataFusionPlanningSnafu)?;
1976
1977 self.ctx.field_columns = new_field_columns;
1978 }
1979
1980 Ok((exprs, new_tags))
1981 }
1982
1983 fn validate_label_name(label_name: &str) -> Result<()> {
1987 if label_name.starts_with("__") {
1989 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1990 }
1991 if !LABEL_NAME_REGEX.is_match(label_name) {
1993 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1994 }
1995
1996 Ok(())
1997 }
1998
1999 fn build_regexp_replace_label_expr(
2001 &self,
2002 other_input_exprs: &mut VecDeque<DfExpr>,
2003 query_engine_state: &QueryEngineState,
2004 ) -> Result<Option<(DfExpr, String)>> {
2005 let dst_label = match other_input_exprs.pop_front() {
2007 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2008 other => UnexpectedPlanExprSnafu {
2009 desc: format!("expected dst_label string literal, but found {:?}", other),
2010 }
2011 .fail()?,
2012 };
2013
2014 Self::validate_label_name(&dst_label)?;
2016 let replacement = match other_input_exprs.pop_front() {
2017 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2018 other => UnexpectedPlanExprSnafu {
2019 desc: format!("expected replacement string literal, but found {:?}", other),
2020 }
2021 .fail()?,
2022 };
2023 let src_label = match other_input_exprs.pop_front() {
2024 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2025 other => UnexpectedPlanExprSnafu {
2026 desc: format!("expected src_label string literal, but found {:?}", other),
2027 }
2028 .fail()?,
2029 };
2030
2031 let regex = match other_input_exprs.pop_front() {
2032 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2033 other => UnexpectedPlanExprSnafu {
2034 desc: format!("expected regex string literal, but found {:?}", other),
2035 }
2036 .fail()?,
2037 };
2038
2039 regex::Regex::new(®ex).map_err(|_| {
2042 InvalidRegularExpressionSnafu {
2043 regex: regex.clone(),
2044 }
2045 .build()
2046 })?;
2047
2048 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2050 return Ok(None);
2051 }
2052
2053 if !self.ctx.tag_columns.contains(&src_label) {
2055 if replacement.is_empty() {
2056 return Ok(None);
2058 } else {
2059 return Ok(Some((
2061 lit(replacement).alias(&dst_label),
2063 dst_label,
2064 )));
2065 }
2066 }
2067
2068 let regex = format!("^(?s:{regex})$");
2071
2072 let session_state = query_engine_state.session_state();
2073 let func = session_state
2074 .scalar_functions()
2075 .get("regexp_replace")
2076 .context(UnsupportedExprSnafu {
2077 name: "regexp_replace",
2078 })?;
2079
2080 let args = vec![
2082 if src_label.is_empty() {
2083 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2084 } else {
2085 DfExpr::Column(Column::from_name(src_label))
2086 },
2087 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2088 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2089 ];
2090
2091 Ok(Some((
2092 DfExpr::ScalarFunction(ScalarFunction {
2093 func: func.clone(),
2094 args,
2095 })
2096 .alias(&dst_label),
2097 dst_label,
2098 )))
2099 }
2100
2101 fn build_concat_labels_expr(
2103 other_input_exprs: &mut VecDeque<DfExpr>,
2104 ctx: &PromPlannerContext,
2105 query_engine_state: &QueryEngineState,
2106 ) -> Result<(DfExpr, String)> {
2107 let dst_label = match other_input_exprs.pop_front() {
2110 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2111 other => UnexpectedPlanExprSnafu {
2112 desc: format!("expected dst_label string literal, but found {:?}", other),
2113 }
2114 .fail()?,
2115 };
2116 let separator = match other_input_exprs.pop_front() {
2117 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2118 other => UnexpectedPlanExprSnafu {
2119 desc: format!("expected separator string literal, but found {:?}", other),
2120 }
2121 .fail()?,
2122 };
2123
2124 let available_columns: HashSet<&str> = ctx
2126 .tag_columns
2127 .iter()
2128 .chain(ctx.field_columns.iter())
2129 .chain(ctx.time_index_column.as_ref())
2130 .map(|s| s.as_str())
2131 .collect();
2132
2133 let src_labels = other_input_exprs
2134 .iter()
2135 .map(|expr| {
2136 match expr {
2138 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2139 if label.is_empty() {
2140 Ok(DfExpr::Literal(ScalarValue::Null, None))
2141 } else if available_columns.contains(label.as_str()) {
2142 Ok(DfExpr::Column(Column::from_name(label)))
2144 } else {
2145 Ok(DfExpr::Literal(ScalarValue::Null, None))
2147 }
2148 }
2149 other => UnexpectedPlanExprSnafu {
2150 desc: format!(
2151 "expected source label string literal, but found {:?}",
2152 other
2153 ),
2154 }
2155 .fail(),
2156 }
2157 })
2158 .collect::<Result<Vec<_>>>()?;
2159 ensure!(
2160 !src_labels.is_empty(),
2161 FunctionInvalidArgumentSnafu {
2162 fn_name: "label_join"
2163 }
2164 );
2165
2166 let session_state = query_engine_state.session_state();
2167 let func = session_state
2168 .scalar_functions()
2169 .get("concat_ws")
2170 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2171
2172 let mut args = Vec::with_capacity(1 + src_labels.len());
2174 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2175 args.extend(src_labels);
2176
2177 Ok((
2178 DfExpr::ScalarFunction(ScalarFunction {
2179 func: func.clone(),
2180 args,
2181 })
2182 .alias(&dst_label),
2183 dst_label,
2184 ))
2185 }
2186
2187 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2188 Ok(DfExpr::Column(Column::from_name(
2189 self.ctx
2190 .time_index_column
2191 .clone()
2192 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2193 )))
2194 }
2195
2196 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2197 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2198 for tag in &self.ctx.tag_columns {
2199 let expr = DfExpr::Column(Column::from_name(tag));
2200 result.push(expr);
2201 }
2202 Ok(result)
2203 }
2204
2205 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2206 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2207 for field in &self.ctx.field_columns {
2208 let expr = DfExpr::Column(Column::from_name(field));
2209 result.push(expr);
2210 }
2211 Ok(result)
2212 }
2213
2214 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2215 let mut result = self
2216 .ctx
2217 .tag_columns
2218 .iter()
2219 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2220 .collect::<Vec<_>>();
2221 result.push(self.create_time_index_column_expr()?.sort(true, true));
2222 Ok(result)
2223 }
2224
2225 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2226 self.ctx
2227 .field_columns
2228 .iter()
2229 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2230 .collect::<Vec<_>>()
2231 }
2232
2233 fn create_sort_exprs_by_tags(
2234 func: &str,
2235 tags: Vec<DfExpr>,
2236 asc: bool,
2237 ) -> Result<Vec<SortExpr>> {
2238 ensure!(
2239 !tags.is_empty(),
2240 FunctionInvalidArgumentSnafu { fn_name: func }
2241 );
2242
2243 tags.iter()
2244 .map(|col| match col {
2245 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2246 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2247 }
2248 other => UnexpectedPlanExprSnafu {
2249 desc: format!("expected label string literal, but found {:?}", other),
2250 }
2251 .fail(),
2252 })
2253 .collect::<Result<Vec<_>>>()
2254 }
2255
2256 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2257 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2258 for value in &self.ctx.field_columns {
2259 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2260 exprs.push(expr);
2261 }
2262
2263 conjunction(exprs).context(ValueNotFoundSnafu {
2264 table: self.table_ref()?.to_quoted_string(),
2265 })
2266 }
2267
2268 fn create_aggregate_exprs(
2284 &mut self,
2285 op: TokenType,
2286 param: &Option<Box<PromExpr>>,
2287 input_plan: &LogicalPlan,
2288 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2289 let mut non_col_args = Vec::new();
2290 let aggr = match op.id() {
2291 token::T_SUM => sum_udaf(),
2292 token::T_QUANTILE => {
2293 let q =
2294 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2295 non_col_args.push(q);
2296 quantile_udaf()
2297 }
2298 token::T_AVG => avg_udaf(),
2299 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2300 token::T_MIN => min_udaf(),
2301 token::T_MAX => max_udaf(),
2302 token::T_GROUP => grouping_udaf(),
2303 token::T_STDDEV => stddev_pop_udaf(),
2304 token::T_STDVAR => var_pop_udaf(),
2305 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2306 name: format!("{op:?}"),
2307 }
2308 .fail()?,
2309 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2310 };
2311
2312 let exprs: Vec<DfExpr> = self
2314 .ctx
2315 .field_columns
2316 .iter()
2317 .map(|col| {
2318 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2319 let expr = aggr.call(non_col_args.clone());
2320 non_col_args.pop();
2321 expr
2322 })
2323 .collect::<Vec<_>>();
2324
2325 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2327 let prev_field_exprs: Vec<_> = self
2328 .ctx
2329 .field_columns
2330 .iter()
2331 .map(|col| DfExpr::Column(Column::from_name(col)))
2332 .collect();
2333
2334 ensure!(
2335 self.ctx.field_columns.len() == 1,
2336 UnsupportedExprSnafu {
2337 name: "count_values on multi-value input"
2338 }
2339 );
2340
2341 prev_field_exprs
2342 } else {
2343 vec![]
2344 };
2345
2346 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2348
2349 let normalized_exprs =
2350 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2351 for expr in normalized_exprs {
2352 new_field_columns.push(expr.schema_name().to_string());
2353 }
2354 self.ctx.field_columns = new_field_columns;
2355
2356 Ok((exprs, prev_field_exprs))
2357 }
2358
2359 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2360 let param = param
2361 .as_deref()
2362 .with_context(|| FunctionInvalidArgumentSnafu {
2363 fn_name: op.to_string(),
2364 })?;
2365 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2366 return FunctionInvalidArgumentSnafu {
2367 fn_name: op.to_string(),
2368 }
2369 .fail();
2370 };
2371
2372 Ok(val)
2373 }
2374
2375 fn get_param_as_literal_expr(
2376 param: &Option<Box<PromExpr>>,
2377 op: Option<TokenType>,
2378 expected_type: Option<ArrowDataType>,
2379 ) -> Result<DfExpr> {
2380 let prom_param = param.as_deref().with_context(|| {
2381 if let Some(op) = op {
2382 FunctionInvalidArgumentSnafu {
2383 fn_name: op.to_string(),
2384 }
2385 } else {
2386 FunctionInvalidArgumentSnafu {
2387 fn_name: "unknown".to_string(),
2388 }
2389 }
2390 })?;
2391
2392 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2393 if let Some(op) = op {
2394 FunctionInvalidArgumentSnafu {
2395 fn_name: op.to_string(),
2396 }
2397 } else {
2398 FunctionInvalidArgumentSnafu {
2399 fn_name: "unknown".to_string(),
2400 }
2401 }
2402 })?;
2403
2404 if let Some(expected_type) = expected_type {
2406 let expr_type = expr
2408 .get_type(&DFSchema::empty())
2409 .context(DataFusionPlanningSnafu)?;
2410 if expected_type != expr_type {
2411 return FunctionInvalidArgumentSnafu {
2412 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2413 }
2414 .fail();
2415 }
2416 }
2417
2418 Ok(expr)
2419 }
2420
2421 fn create_window_exprs(
2424 &mut self,
2425 op: TokenType,
2426 group_exprs: Vec<DfExpr>,
2427 input_plan: &LogicalPlan,
2428 ) -> Result<Vec<DfExpr>> {
2429 ensure!(
2430 self.ctx.field_columns.len() == 1,
2431 UnsupportedExprSnafu {
2432 name: "topk or bottomk on multi-value input"
2433 }
2434 );
2435
2436 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2437
2438 let asc = matches!(op.id(), token::T_BOTTOMK);
2439
2440 let tag_sort_exprs = self
2441 .create_tag_column_exprs()?
2442 .into_iter()
2443 .map(|expr| expr.sort(asc, true));
2444
2445 let exprs: Vec<DfExpr> = self
2447 .ctx
2448 .field_columns
2449 .iter()
2450 .map(|col| {
2451 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2452 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2454 sort_exprs.extend(tag_sort_exprs.clone());
2457
2458 DfExpr::WindowFunction(Box::new(WindowFunction {
2459 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2460 params: WindowFunctionParams {
2461 args: vec![],
2462 partition_by: group_exprs.clone(),
2463 order_by: sort_exprs,
2464 window_frame: WindowFrame::new(Some(true)),
2465 null_treatment: None,
2466 distinct: false,
2467 filter: None,
2468 },
2469 }))
2470 })
2471 .collect();
2472
2473 let normalized_exprs =
2474 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2475 Ok(normalized_exprs)
2476 }
2477
2478 #[deprecated(
2480 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2481 )]
2482 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2483 match expr {
2484 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2485 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2486 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2487 Self::try_build_float_literal(expr).map(|f| -f)
2488 }
2489 PromExpr::StringLiteral(_)
2490 | PromExpr::Binary(_)
2491 | PromExpr::VectorSelector(_)
2492 | PromExpr::MatrixSelector(_)
2493 | PromExpr::Call(_)
2494 | PromExpr::Extension(_)
2495 | PromExpr::Aggregate(_)
2496 | PromExpr::Subquery(_) => None,
2497 }
2498 }
2499
2500 async fn create_histogram_plan(
2502 &mut self,
2503 args: &PromFunctionArgs,
2504 query_engine_state: &QueryEngineState,
2505 ) -> Result<LogicalPlan> {
2506 if args.args.len() != 2 {
2507 return FunctionInvalidArgumentSnafu {
2508 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2509 }
2510 .fail();
2511 }
2512 #[allow(deprecated)]
2513 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2514 FunctionInvalidArgumentSnafu {
2515 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2516 }
2517 })?;
2518
2519 let input = args.args[1].as_ref().clone();
2520 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2521
2522 if !self.ctx.has_le_tag() {
2523 return Ok(LogicalPlan::EmptyRelation(
2526 datafusion::logical_expr::EmptyRelation {
2527 produce_one_row: false,
2528 schema: Arc::new(DFSchema::empty()),
2529 },
2530 ));
2531 }
2532 let time_index_column =
2533 self.ctx
2534 .time_index_column
2535 .clone()
2536 .with_context(|| TimeIndexNotFoundSnafu {
2537 table: self.ctx.table_name.clone().unwrap_or_default(),
2538 })?;
2539 let field_column = self
2541 .ctx
2542 .field_columns
2543 .first()
2544 .with_context(|| FunctionInvalidArgumentSnafu {
2545 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2546 })?
2547 .clone();
2548 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2550
2551 Ok(LogicalPlan::Extension(Extension {
2552 node: Arc::new(
2553 HistogramFold::new(
2554 LE_COLUMN_NAME.to_string(),
2555 field_column,
2556 time_index_column,
2557 phi,
2558 input_plan,
2559 )
2560 .context(DataFusionPlanningSnafu)?,
2561 ),
2562 }))
2563 }
2564
2565 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2567 if args.args.len() != 1 {
2568 return FunctionInvalidArgumentSnafu {
2569 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2570 }
2571 .fail();
2572 }
2573 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2574
2575 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2577 self.ctx.reset_table_name_and_schema();
2578 self.ctx.tag_columns = vec![];
2579 self.ctx.field_columns = vec![greptime_value().to_string()];
2580 Ok(LogicalPlan::Extension(Extension {
2581 node: Arc::new(
2582 EmptyMetric::new(
2583 self.ctx.start,
2584 self.ctx.end,
2585 self.ctx.interval,
2586 SPECIAL_TIME_FUNCTION.to_string(),
2587 greptime_value().to_string(),
2588 Some(lit),
2589 )
2590 .context(DataFusionPlanningSnafu)?,
2591 ),
2592 }))
2593 }
2594
2595 async fn create_scalar_plan(
2597 &mut self,
2598 args: &PromFunctionArgs,
2599 query_engine_state: &QueryEngineState,
2600 ) -> Result<LogicalPlan> {
2601 ensure!(
2602 args.len() == 1,
2603 FunctionInvalidArgumentSnafu {
2604 fn_name: SCALAR_FUNCTION
2605 }
2606 );
2607 let input = self
2608 .prom_expr_to_plan(&args.args[0], query_engine_state)
2609 .await?;
2610 ensure!(
2611 self.ctx.field_columns.len() == 1,
2612 MultiFieldsNotSupportedSnafu {
2613 operator: SCALAR_FUNCTION
2614 },
2615 );
2616 let scalar_plan = LogicalPlan::Extension(Extension {
2617 node: Arc::new(
2618 ScalarCalculate::new(
2619 self.ctx.start,
2620 self.ctx.end,
2621 self.ctx.interval,
2622 input,
2623 self.ctx.time_index_column.as_ref().unwrap(),
2624 &self.ctx.tag_columns,
2625 &self.ctx.field_columns[0],
2626 self.ctx.table_name.as_deref(),
2627 )
2628 .context(PromqlPlanNodeSnafu)?,
2629 ),
2630 });
2631 self.ctx.tag_columns.clear();
2633 self.ctx.field_columns.clear();
2634 self.ctx
2635 .field_columns
2636 .push(scalar_plan.schema().field(1).name().clone());
2637 Ok(scalar_plan)
2638 }
2639
2640 async fn create_absent_plan(
2642 &mut self,
2643 args: &PromFunctionArgs,
2644 query_engine_state: &QueryEngineState,
2645 ) -> Result<LogicalPlan> {
2646 if args.args.len() != 1 {
2647 return FunctionInvalidArgumentSnafu {
2648 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2649 }
2650 .fail();
2651 }
2652 let input = self
2653 .prom_expr_to_plan(&args.args[0], query_engine_state)
2654 .await?;
2655
2656 let time_index_expr = self.create_time_index_column_expr()?;
2657 let first_field_expr =
2658 self.create_field_column_exprs()?
2659 .pop()
2660 .with_context(|| ValueNotFoundSnafu {
2661 table: self.ctx.table_name.clone().unwrap_or_default(),
2662 })?;
2663 let first_value_expr = first_value(first_field_expr, vec![]);
2664
2665 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2666 .aggregate(
2667 vec![time_index_expr.clone()],
2668 vec![first_value_expr.clone()],
2669 )
2670 .context(DataFusionPlanningSnafu)?
2671 .sort(vec![time_index_expr.sort(true, false)])
2672 .context(DataFusionPlanningSnafu)?
2673 .build()
2674 .context(DataFusionPlanningSnafu)?;
2675
2676 let fake_labels = self
2677 .ctx
2678 .selector_matcher
2679 .iter()
2680 .filter_map(|matcher| match matcher.op {
2681 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2682 _ => None,
2683 })
2684 .collect::<Vec<_>>();
2685
2686 let absent_plan = LogicalPlan::Extension(Extension {
2688 node: Arc::new(
2689 Absent::try_new(
2690 self.ctx.start,
2691 self.ctx.end,
2692 self.ctx.interval,
2693 self.ctx.time_index_column.as_ref().unwrap().clone(),
2694 self.ctx.field_columns[0].clone(),
2695 fake_labels,
2696 ordered_aggregated_input,
2697 )
2698 .context(DataFusionPlanningSnafu)?,
2699 ),
2700 });
2701
2702 Ok(absent_plan)
2703 }
2704
2705 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2708 match expr {
2709 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2710 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2711 PromExpr::VectorSelector(_)
2712 | PromExpr::MatrixSelector(_)
2713 | PromExpr::Extension(_)
2714 | PromExpr::Aggregate(_)
2715 | PromExpr::Subquery(_) => None,
2716 PromExpr::Call(Call { func, .. }) => {
2717 if func.name == SPECIAL_TIME_FUNCTION {
2718 None
2721 } else {
2722 None
2723 }
2724 }
2725 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2726 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2728 PromExpr::Binary(PromBinaryExpr {
2729 lhs,
2730 rhs,
2731 op,
2732 modifier,
2733 }) => {
2734 let lhs = Self::try_build_literal_expr(lhs)?;
2735 let rhs = Self::try_build_literal_expr(rhs)?;
2736 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2737 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2738 let expr = expr_builder(lhs, rhs).ok()?;
2739
2740 let should_return_bool = if let Some(m) = modifier {
2741 m.return_bool
2742 } else {
2743 false
2744 };
2745 if is_comparison_op && should_return_bool {
2746 Some(DfExpr::Cast(Cast {
2747 expr: Box::new(expr),
2748 data_type: ArrowDataType::Float64,
2749 }))
2750 } else {
2751 Some(expr)
2752 }
2753 }
2754 }
2755 }
2756
2757 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2758 match expr {
2759 PromExpr::Call(Call { func, .. }) => {
2760 if func.name == SPECIAL_TIME_FUNCTION
2761 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2762 {
2763 Some(build_special_time_expr(time_index_col))
2764 } else {
2765 None
2766 }
2767 }
2768 _ => None,
2769 }
2770 }
2771
2772 #[allow(clippy::type_complexity)]
2775 fn prom_token_to_binary_expr_builder(
2776 token: TokenType,
2777 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2778 match token.id() {
2779 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2780 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2781 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2782 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2783 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2784 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2785 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2786 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2787 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2788 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2789 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2790 token::T_POW => Ok(Box::new(|lhs, rhs| {
2791 Ok(DfExpr::ScalarFunction(ScalarFunction {
2792 func: datafusion_functions::math::power(),
2793 args: vec![lhs, rhs],
2794 }))
2795 })),
2796 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2797 Ok(DfExpr::ScalarFunction(ScalarFunction {
2798 func: datafusion_functions::math::atan2(),
2799 args: vec![lhs, rhs],
2800 }))
2801 })),
2802 _ => UnexpectedTokenSnafu { token }.fail(),
2803 }
2804 }
2805
2806 fn is_token_a_comparison_op(token: TokenType) -> bool {
2808 matches!(
2809 token.id(),
2810 token::T_EQLC
2811 | token::T_NEQ
2812 | token::T_GTR
2813 | token::T_LSS
2814 | token::T_GTE
2815 | token::T_LTE
2816 )
2817 }
2818
2819 fn is_token_a_set_op(token: TokenType) -> bool {
2821 matches!(
2822 token.id(),
2823 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2827 }
2828
2829 #[allow(clippy::too_many_arguments)]
2832 fn join_on_non_field_columns(
2833 &self,
2834 left: LogicalPlan,
2835 right: LogicalPlan,
2836 left_table_ref: TableReference,
2837 right_table_ref: TableReference,
2838 left_time_index_column: Option<String>,
2839 right_time_index_column: Option<String>,
2840 only_join_time_index: bool,
2841 modifier: &Option<BinModifier>,
2842 ) -> Result<LogicalPlan> {
2843 let mut left_tag_columns = if only_join_time_index {
2844 BTreeSet::new()
2845 } else {
2846 self.ctx
2847 .tag_columns
2848 .iter()
2849 .cloned()
2850 .collect::<BTreeSet<_>>()
2851 };
2852 let mut right_tag_columns = left_tag_columns.clone();
2853
2854 if let Some(modifier) = modifier {
2856 if let Some(matching) = &modifier.matching {
2858 match matching {
2859 LabelModifier::Include(on) => {
2861 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2862 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2863 right_tag_columns =
2864 right_tag_columns.intersection(&mask).cloned().collect();
2865 }
2866 LabelModifier::Exclude(ignoring) => {
2868 for label in &ignoring.labels {
2870 let _ = left_tag_columns.remove(label);
2871 let _ = right_tag_columns.remove(label);
2872 }
2873 }
2874 }
2875 }
2876 }
2877
2878 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2880 (left_time_index_column, right_time_index_column)
2881 {
2882 left_tag_columns.insert(left_time_index_column);
2883 right_tag_columns.insert(right_time_index_column);
2884 }
2885
2886 let right = LogicalPlanBuilder::from(right)
2887 .alias(right_table_ref)
2888 .context(DataFusionPlanningSnafu)?
2889 .build()
2890 .context(DataFusionPlanningSnafu)?;
2891
2892 LogicalPlanBuilder::from(left)
2894 .alias(left_table_ref)
2895 .context(DataFusionPlanningSnafu)?
2896 .join_detailed(
2897 right,
2898 JoinType::Inner,
2899 (
2900 left_tag_columns
2901 .into_iter()
2902 .map(Column::from_name)
2903 .collect::<Vec<_>>(),
2904 right_tag_columns
2905 .into_iter()
2906 .map(Column::from_name)
2907 .collect::<Vec<_>>(),
2908 ),
2909 None,
2910 NullEquality::NullEqualsNull,
2911 )
2912 .context(DataFusionPlanningSnafu)?
2913 .build()
2914 .context(DataFusionPlanningSnafu)
2915 }
2916
2917 fn set_op_on_non_field_columns(
2919 &mut self,
2920 left: LogicalPlan,
2921 mut right: LogicalPlan,
2922 left_context: PromPlannerContext,
2923 right_context: PromPlannerContext,
2924 op: TokenType,
2925 modifier: &Option<BinModifier>,
2926 ) -> Result<LogicalPlan> {
2927 let mut left_tag_col_set = left_context
2928 .tag_columns
2929 .iter()
2930 .cloned()
2931 .collect::<HashSet<_>>();
2932 let mut right_tag_col_set = right_context
2933 .tag_columns
2934 .iter()
2935 .cloned()
2936 .collect::<HashSet<_>>();
2937
2938 if matches!(op.id(), token::T_LOR) {
2939 return self.or_operator(
2940 left,
2941 right,
2942 left_tag_col_set,
2943 right_tag_col_set,
2944 left_context,
2945 right_context,
2946 modifier,
2947 );
2948 }
2949
2950 if let Some(modifier) = modifier {
2952 ensure!(
2954 matches!(
2955 modifier.card,
2956 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2957 ),
2958 UnsupportedVectorMatchSnafu {
2959 name: modifier.card.clone(),
2960 },
2961 );
2962 if let Some(matching) = &modifier.matching {
2964 match matching {
2965 LabelModifier::Include(on) => {
2967 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2968 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2969 right_tag_col_set =
2970 right_tag_col_set.intersection(&mask).cloned().collect();
2971 }
2972 LabelModifier::Exclude(ignoring) => {
2974 for label in &ignoring.labels {
2976 let _ = left_tag_col_set.remove(label);
2977 let _ = right_tag_col_set.remove(label);
2978 }
2979 }
2980 }
2981 }
2982 }
2983 if !matches!(op.id(), token::T_LOR) {
2985 ensure!(
2986 left_tag_col_set == right_tag_col_set,
2987 CombineTableColumnMismatchSnafu {
2988 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2989 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2990 }
2991 )
2992 };
2993 let left_time_index = left_context.time_index_column.clone().unwrap();
2994 let right_time_index = right_context.time_index_column.clone().unwrap();
2995 let join_keys = left_tag_col_set
2996 .iter()
2997 .cloned()
2998 .chain([left_time_index.clone()])
2999 .collect::<Vec<_>>();
3000 self.ctx.time_index_column = Some(left_time_index.clone());
3001
3002 if left_context.time_index_column != right_context.time_index_column {
3004 let right_project_exprs = right
3005 .schema()
3006 .fields()
3007 .iter()
3008 .map(|field| {
3009 if field.name() == &right_time_index {
3010 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3011 } else {
3012 DfExpr::Column(Column::from_name(field.name()))
3013 }
3014 })
3015 .collect::<Vec<_>>();
3016
3017 right = LogicalPlanBuilder::from(right)
3018 .project(right_project_exprs)
3019 .context(DataFusionPlanningSnafu)?
3020 .build()
3021 .context(DataFusionPlanningSnafu)?;
3022 }
3023
3024 ensure!(
3025 left_context.field_columns.len() == 1,
3026 MultiFieldsNotSupportedSnafu {
3027 operator: "AND operator"
3028 }
3029 );
3030 let left_field_col = left_context.field_columns.first().unwrap();
3033 self.ctx.field_columns = vec![left_field_col.clone()];
3034
3035 match op.id() {
3038 token::T_LAND => LogicalPlanBuilder::from(left)
3039 .distinct()
3040 .context(DataFusionPlanningSnafu)?
3041 .join_detailed(
3042 right,
3043 JoinType::LeftSemi,
3044 (join_keys.clone(), join_keys),
3045 None,
3046 NullEquality::NullEqualsNull,
3047 )
3048 .context(DataFusionPlanningSnafu)?
3049 .build()
3050 .context(DataFusionPlanningSnafu),
3051 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3052 .distinct()
3053 .context(DataFusionPlanningSnafu)?
3054 .join_detailed(
3055 right,
3056 JoinType::LeftAnti,
3057 (join_keys.clone(), join_keys),
3058 None,
3059 NullEquality::NullEqualsNull,
3060 )
3061 .context(DataFusionPlanningSnafu)?
3062 .build()
3063 .context(DataFusionPlanningSnafu),
3064 token::T_LOR => {
3065 unreachable!()
3068 }
3069 _ => UnexpectedTokenSnafu { token: op }.fail(),
3070 }
3071 }
3072
3073 #[allow(clippy::too_many_arguments)]
3075 fn or_operator(
3076 &mut self,
3077 left: LogicalPlan,
3078 right: LogicalPlan,
3079 left_tag_cols_set: HashSet<String>,
3080 right_tag_cols_set: HashSet<String>,
3081 left_context: PromPlannerContext,
3082 right_context: PromPlannerContext,
3083 modifier: &Option<BinModifier>,
3084 ) -> Result<LogicalPlan> {
3085 ensure!(
3087 left_context.field_columns.len() == right_context.field_columns.len(),
3088 CombineTableColumnMismatchSnafu {
3089 left: left_context.field_columns.clone(),
3090 right: right_context.field_columns.clone()
3091 }
3092 );
3093 ensure!(
3094 left_context.field_columns.len() == 1,
3095 MultiFieldsNotSupportedSnafu {
3096 operator: "OR operator"
3097 }
3098 );
3099
3100 let all_tags = left_tag_cols_set
3102 .union(&right_tag_cols_set)
3103 .cloned()
3104 .collect::<HashSet<_>>();
3105 let tags_not_in_left = all_tags
3106 .difference(&left_tag_cols_set)
3107 .cloned()
3108 .collect::<Vec<_>>();
3109 let tags_not_in_right = all_tags
3110 .difference(&right_tag_cols_set)
3111 .cloned()
3112 .collect::<Vec<_>>();
3113 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3114 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3115 let left_qualifier_string = left_qualifier
3116 .as_ref()
3117 .map(|l| l.to_string())
3118 .unwrap_or_default();
3119 let right_qualifier_string = right_qualifier
3120 .as_ref()
3121 .map(|r| r.to_string())
3122 .unwrap_or_default();
3123 let left_time_index_column =
3124 left_context
3125 .time_index_column
3126 .clone()
3127 .with_context(|| TimeIndexNotFoundSnafu {
3128 table: left_qualifier_string.clone(),
3129 })?;
3130 let right_time_index_column =
3131 right_context
3132 .time_index_column
3133 .clone()
3134 .with_context(|| TimeIndexNotFoundSnafu {
3135 table: right_qualifier_string.clone(),
3136 })?;
3137 let left_field_col = left_context.field_columns.first().unwrap();
3139 let right_field_col = right_context.field_columns.first().unwrap();
3140
3141 let mut all_columns_set = left
3143 .schema()
3144 .fields()
3145 .iter()
3146 .chain(right.schema().fields().iter())
3147 .map(|field| field.name().clone())
3148 .collect::<HashSet<_>>();
3149 all_columns_set.remove(&left_time_index_column);
3151 all_columns_set.remove(&right_time_index_column);
3152 if left_field_col != right_field_col {
3154 all_columns_set.remove(right_field_col);
3155 }
3156 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3157 all_columns.sort_unstable();
3159 all_columns.insert(0, left_time_index_column.clone());
3161
3162 let left_proj_exprs = all_columns.iter().map(|col| {
3164 if tags_not_in_left.contains(col) {
3165 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3166 } else {
3167 DfExpr::Column(Column::new(None::<String>, col))
3168 }
3169 });
3170 let right_time_index_expr = DfExpr::Column(Column::new(
3171 right_qualifier.clone(),
3172 right_time_index_column,
3173 ))
3174 .alias(left_time_index_column.clone());
3175 let right_qualifier_for_field = right
3178 .schema()
3179 .iter()
3180 .find(|(_, f)| f.name() == right_field_col)
3181 .map(|(q, _)| q)
3182 .with_context(|| ColumnNotFoundSnafu {
3183 col: right_field_col.clone(),
3184 })?
3185 .cloned();
3186
3187 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3189 if col == left_field_col && left_field_col != right_field_col {
3191 DfExpr::Column(Column::new(
3193 right_qualifier_for_field.clone(),
3194 right_field_col,
3195 ))
3196 } else if tags_not_in_right.contains(col) {
3197 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3198 } else {
3199 DfExpr::Column(Column::new(None::<String>, col))
3200 }
3201 });
3202 let right_proj_exprs = [right_time_index_expr]
3203 .into_iter()
3204 .chain(right_proj_exprs_without_time_index);
3205
3206 let left_projected = LogicalPlanBuilder::from(left)
3207 .project(left_proj_exprs)
3208 .context(DataFusionPlanningSnafu)?
3209 .alias(left_qualifier_string.clone())
3210 .context(DataFusionPlanningSnafu)?
3211 .build()
3212 .context(DataFusionPlanningSnafu)?;
3213 let right_projected = LogicalPlanBuilder::from(right)
3214 .project(right_proj_exprs)
3215 .context(DataFusionPlanningSnafu)?
3216 .alias(right_qualifier_string.clone())
3217 .context(DataFusionPlanningSnafu)?
3218 .build()
3219 .context(DataFusionPlanningSnafu)?;
3220
3221 let mut match_columns = if let Some(modifier) = modifier
3223 && let Some(matching) = &modifier.matching
3224 {
3225 match matching {
3226 LabelModifier::Include(on) => on.labels.clone(),
3228 LabelModifier::Exclude(ignoring) => {
3230 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3231 all_tags.difference(&ignoring).cloned().collect()
3232 }
3233 }
3234 } else {
3235 all_tags.iter().cloned().collect()
3236 };
3237 match_columns.sort_unstable();
3239 let schema = left_projected.schema().clone();
3241 let union_distinct_on = UnionDistinctOn::new(
3242 left_projected,
3243 right_projected,
3244 match_columns,
3245 left_time_index_column.clone(),
3246 schema,
3247 );
3248 let result = LogicalPlan::Extension(Extension {
3249 node: Arc::new(union_distinct_on),
3250 });
3251
3252 self.ctx.time_index_column = Some(left_time_index_column);
3254 self.ctx.tag_columns = all_tags.into_iter().collect();
3255 self.ctx.field_columns = vec![left_field_col.clone()];
3256
3257 Ok(result)
3258 }
3259
3260 fn projection_for_each_field_column<F>(
3268 &mut self,
3269 input: LogicalPlan,
3270 name_to_expr: F,
3271 ) -> Result<LogicalPlan>
3272 where
3273 F: FnMut(&String) -> Result<DfExpr>,
3274 {
3275 let non_field_columns_iter = self
3276 .ctx
3277 .tag_columns
3278 .iter()
3279 .chain(self.ctx.time_index_column.iter())
3280 .map(|col| {
3281 Ok(DfExpr::Column(Column::new(
3282 self.ctx.table_name.clone().map(TableReference::bare),
3283 col,
3284 )))
3285 });
3286
3287 let result_field_columns = self
3289 .ctx
3290 .field_columns
3291 .iter()
3292 .map(name_to_expr)
3293 .collect::<Result<Vec<_>>>()?;
3294
3295 self.ctx.field_columns = result_field_columns
3297 .iter()
3298 .map(|expr| expr.schema_name().to_string())
3299 .collect();
3300 let field_columns_iter = result_field_columns
3301 .into_iter()
3302 .zip(self.ctx.field_columns.iter())
3303 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3304
3305 let project_fields = non_field_columns_iter
3307 .chain(field_columns_iter)
3308 .collect::<Result<Vec<_>>>()?;
3309
3310 LogicalPlanBuilder::from(input)
3311 .project(project_fields)
3312 .context(DataFusionPlanningSnafu)?
3313 .build()
3314 .context(DataFusionPlanningSnafu)
3315 }
3316
3317 fn filter_on_field_column<F>(
3320 &self,
3321 input: LogicalPlan,
3322 mut name_to_expr: F,
3323 ) -> Result<LogicalPlan>
3324 where
3325 F: FnMut(&String) -> Result<DfExpr>,
3326 {
3327 ensure!(
3328 self.ctx.field_columns.len() == 1,
3329 UnsupportedExprSnafu {
3330 name: "filter on multi-value input"
3331 }
3332 );
3333
3334 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3335
3336 LogicalPlanBuilder::from(input)
3337 .filter(field_column_filter)
3338 .context(DataFusionPlanningSnafu)?
3339 .build()
3340 .context(DataFusionPlanningSnafu)
3341 }
3342
3343 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3346 let input_expr = datafusion::logical_expr::col(
3347 self.ctx
3348 .time_index_column
3349 .as_ref()
3350 .with_context(|| TimeIndexNotFoundSnafu {
3352 table: "<doesn't matter>",
3353 })?,
3354 );
3355 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3356 func: datafusion_functions::datetime::date_part(),
3357 args: vec![date_part.lit(), input_expr],
3358 });
3359 Ok(fn_expr)
3360 }
3361
3362 fn apply_alias_projection(
3364 &mut self,
3365 plan: LogicalPlan,
3366 alias_name: String,
3367 ) -> Result<LogicalPlan> {
3368 let fields_expr = self.create_field_column_exprs()?;
3369
3370 ensure!(
3372 fields_expr.len() == 1,
3373 UnsupportedExprSnafu {
3374 name: "alias on multi-value result"
3375 }
3376 );
3377
3378 let project_fields = fields_expr
3379 .into_iter()
3380 .map(|expr| expr.alias(&alias_name))
3381 .chain(self.create_tag_column_exprs()?)
3382 .chain(Some(self.create_time_index_column_expr()?));
3383
3384 LogicalPlanBuilder::from(plan)
3385 .project(project_fields)
3386 .context(DataFusionPlanningSnafu)?
3387 .build()
3388 .context(DataFusionPlanningSnafu)
3389 }
3390}
3391
3392#[derive(Default, Debug)]
3393struct FunctionArgs {
3394 input: Option<PromExpr>,
3395 literals: Vec<DfExpr>,
3396}
3397
3398#[derive(Debug, Clone)]
3401enum ScalarFunc {
3402 DataFusionBuiltin(Arc<ScalarUdfDef>),
3406 DataFusionUdf(Arc<ScalarUdfDef>),
3410 Udf(Arc<ScalarUdfDef>),
3415 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3422 GeneratedExpr,
3426}
3427
3428#[cfg(test)]
3429mod test {
3430 use std::time::{Duration, UNIX_EPOCH};
3431
3432 use catalog::RegisterTableRequest;
3433 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3434 use common_base::Plugins;
3435 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3436 use common_query::prelude::greptime_timestamp;
3437 use common_query::test_util::DummyDecoder;
3438 use datatypes::prelude::ConcreteDataType;
3439 use datatypes::schema::{ColumnSchema, Schema};
3440 use promql_parser::label::Labels;
3441 use promql_parser::parser;
3442 use session::context::QueryContext;
3443 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3444 use table::test_util::EmptyTable;
3445
3446 use super::*;
3447 use crate::options::QueryOptions;
3448
3449 fn build_query_engine_state() -> QueryEngineState {
3450 QueryEngineState::new(
3451 new_memory_catalog_manager().unwrap(),
3452 None,
3453 None,
3454 None,
3455 None,
3456 None,
3457 false,
3458 Plugins::default(),
3459 QueryOptions::default(),
3460 )
3461 }
3462
3463 async fn build_test_table_provider(
3464 table_name_tuples: &[(String, String)],
3465 num_tag: usize,
3466 num_field: usize,
3467 ) -> DfTableSourceProvider {
3468 let catalog_list = MemoryCatalogManager::with_default_setup();
3469 for (schema_name, table_name) in table_name_tuples {
3470 let mut columns = vec![];
3471 for i in 0..num_tag {
3472 columns.push(ColumnSchema::new(
3473 format!("tag_{i}"),
3474 ConcreteDataType::string_datatype(),
3475 false,
3476 ));
3477 }
3478 columns.push(
3479 ColumnSchema::new(
3480 "timestamp".to_string(),
3481 ConcreteDataType::timestamp_millisecond_datatype(),
3482 false,
3483 )
3484 .with_time_index(true),
3485 );
3486 for i in 0..num_field {
3487 columns.push(ColumnSchema::new(
3488 format!("field_{i}"),
3489 ConcreteDataType::float64_datatype(),
3490 true,
3491 ));
3492 }
3493 let schema = Arc::new(Schema::new(columns));
3494 let table_meta = TableMetaBuilder::empty()
3495 .schema(schema)
3496 .primary_key_indices((0..num_tag).collect())
3497 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3498 .next_column_id(1024)
3499 .build()
3500 .unwrap();
3501 let table_info = TableInfoBuilder::default()
3502 .name(table_name.clone())
3503 .meta(table_meta)
3504 .build()
3505 .unwrap();
3506 let table = EmptyTable::from_table_info(&table_info);
3507
3508 assert!(
3509 catalog_list
3510 .register_table_sync(RegisterTableRequest {
3511 catalog: DEFAULT_CATALOG_NAME.to_string(),
3512 schema: schema_name.clone(),
3513 table_name: table_name.clone(),
3514 table_id: 1024,
3515 table,
3516 })
3517 .is_ok()
3518 );
3519 }
3520
3521 DfTableSourceProvider::new(
3522 catalog_list,
3523 false,
3524 QueryContext::arc(),
3525 DummyDecoder::arc(),
3526 false,
3527 )
3528 }
3529
3530 async fn build_test_table_provider_with_fields(
3531 table_name_tuples: &[(String, String)],
3532 tags: &[&str],
3533 ) -> DfTableSourceProvider {
3534 let catalog_list = MemoryCatalogManager::with_default_setup();
3535 for (schema_name, table_name) in table_name_tuples {
3536 let mut columns = vec![];
3537 let num_tag = tags.len();
3538 for tag in tags {
3539 columns.push(ColumnSchema::new(
3540 tag.to_string(),
3541 ConcreteDataType::string_datatype(),
3542 false,
3543 ));
3544 }
3545 columns.push(
3546 ColumnSchema::new(
3547 greptime_timestamp().to_string(),
3548 ConcreteDataType::timestamp_millisecond_datatype(),
3549 false,
3550 )
3551 .with_time_index(true),
3552 );
3553 columns.push(ColumnSchema::new(
3554 greptime_value().to_string(),
3555 ConcreteDataType::float64_datatype(),
3556 true,
3557 ));
3558 let schema = Arc::new(Schema::new(columns));
3559 let table_meta = TableMetaBuilder::empty()
3560 .schema(schema)
3561 .primary_key_indices((0..num_tag).collect())
3562 .next_column_id(1024)
3563 .build()
3564 .unwrap();
3565 let table_info = TableInfoBuilder::default()
3566 .name(table_name.clone())
3567 .meta(table_meta)
3568 .build()
3569 .unwrap();
3570 let table = EmptyTable::from_table_info(&table_info);
3571
3572 assert!(
3573 catalog_list
3574 .register_table_sync(RegisterTableRequest {
3575 catalog: DEFAULT_CATALOG_NAME.to_string(),
3576 schema: schema_name.clone(),
3577 table_name: table_name.clone(),
3578 table_id: 1024,
3579 table,
3580 })
3581 .is_ok()
3582 );
3583 }
3584
3585 DfTableSourceProvider::new(
3586 catalog_list,
3587 false,
3588 QueryContext::arc(),
3589 DummyDecoder::arc(),
3590 false,
3591 )
3592 }
3593
3594 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3610 let prom_expr =
3611 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3612 let eval_stmt = EvalStmt {
3613 expr: prom_expr,
3614 start: UNIX_EPOCH,
3615 end: UNIX_EPOCH
3616 .checked_add(Duration::from_secs(100_000))
3617 .unwrap(),
3618 interval: Duration::from_secs(5),
3619 lookback_delta: Duration::from_secs(1),
3620 };
3621
3622 let table_provider = build_test_table_provider(
3623 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3624 1,
3625 1,
3626 )
3627 .await;
3628 let plan =
3629 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3630 .await
3631 .unwrap();
3632
3633 let expected = String::from(
3634 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3635 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3636 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3637 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3638 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3639 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3640 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3641 ).replace("TEMPLATE", plan_name);
3642
3643 assert_eq!(plan.display_indent_schema().to_string(), expected);
3644 }
3645
3646 #[tokio::test]
3647 async fn single_abs() {
3648 do_single_instant_function_call("abs", "abs").await;
3649 }
3650
3651 #[tokio::test]
3652 #[should_panic]
3653 async fn single_absent() {
3654 do_single_instant_function_call("absent", "").await;
3655 }
3656
3657 #[tokio::test]
3658 async fn single_ceil() {
3659 do_single_instant_function_call("ceil", "ceil").await;
3660 }
3661
3662 #[tokio::test]
3663 async fn single_exp() {
3664 do_single_instant_function_call("exp", "exp").await;
3665 }
3666
3667 #[tokio::test]
3668 async fn single_ln() {
3669 do_single_instant_function_call("ln", "ln").await;
3670 }
3671
3672 #[tokio::test]
3673 async fn single_log2() {
3674 do_single_instant_function_call("log2", "log2").await;
3675 }
3676
3677 #[tokio::test]
3678 async fn single_log10() {
3679 do_single_instant_function_call("log10", "log10").await;
3680 }
3681
3682 #[tokio::test]
3683 #[should_panic]
3684 async fn single_scalar() {
3685 do_single_instant_function_call("scalar", "").await;
3686 }
3687
3688 #[tokio::test]
3689 #[should_panic]
3690 async fn single_sgn() {
3691 do_single_instant_function_call("sgn", "").await;
3692 }
3693
3694 #[tokio::test]
3695 #[should_panic]
3696 async fn single_sort() {
3697 do_single_instant_function_call("sort", "").await;
3698 }
3699
3700 #[tokio::test]
3701 #[should_panic]
3702 async fn single_sort_desc() {
3703 do_single_instant_function_call("sort_desc", "").await;
3704 }
3705
3706 #[tokio::test]
3707 async fn single_sqrt() {
3708 do_single_instant_function_call("sqrt", "sqrt").await;
3709 }
3710
3711 #[tokio::test]
3712 #[should_panic]
3713 async fn single_timestamp() {
3714 do_single_instant_function_call("timestamp", "").await;
3715 }
3716
3717 #[tokio::test]
3718 async fn single_acos() {
3719 do_single_instant_function_call("acos", "acos").await;
3720 }
3721
3722 #[tokio::test]
3723 #[should_panic]
3724 async fn single_acosh() {
3725 do_single_instant_function_call("acosh", "").await;
3726 }
3727
3728 #[tokio::test]
3729 async fn single_asin() {
3730 do_single_instant_function_call("asin", "asin").await;
3731 }
3732
3733 #[tokio::test]
3734 #[should_panic]
3735 async fn single_asinh() {
3736 do_single_instant_function_call("asinh", "").await;
3737 }
3738
3739 #[tokio::test]
3740 async fn single_atan() {
3741 do_single_instant_function_call("atan", "atan").await;
3742 }
3743
3744 #[tokio::test]
3745 #[should_panic]
3746 async fn single_atanh() {
3747 do_single_instant_function_call("atanh", "").await;
3748 }
3749
3750 #[tokio::test]
3751 async fn single_cos() {
3752 do_single_instant_function_call("cos", "cos").await;
3753 }
3754
3755 #[tokio::test]
3756 #[should_panic]
3757 async fn single_cosh() {
3758 do_single_instant_function_call("cosh", "").await;
3759 }
3760
3761 #[tokio::test]
3762 async fn single_sin() {
3763 do_single_instant_function_call("sin", "sin").await;
3764 }
3765
3766 #[tokio::test]
3767 #[should_panic]
3768 async fn single_sinh() {
3769 do_single_instant_function_call("sinh", "").await;
3770 }
3771
3772 #[tokio::test]
3773 async fn single_tan() {
3774 do_single_instant_function_call("tan", "tan").await;
3775 }
3776
3777 #[tokio::test]
3778 #[should_panic]
3779 async fn single_tanh() {
3780 do_single_instant_function_call("tanh", "").await;
3781 }
3782
3783 #[tokio::test]
3784 #[should_panic]
3785 async fn single_deg() {
3786 do_single_instant_function_call("deg", "").await;
3787 }
3788
3789 #[tokio::test]
3790 #[should_panic]
3791 async fn single_rad() {
3792 do_single_instant_function_call("rad", "").await;
3793 }
3794
3795 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3817 let prom_expr = parser::parse(&format!(
3818 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3819 ))
3820 .unwrap();
3821 let mut eval_stmt = EvalStmt {
3822 expr: prom_expr,
3823 start: UNIX_EPOCH,
3824 end: UNIX_EPOCH
3825 .checked_add(Duration::from_secs(100_000))
3826 .unwrap(),
3827 interval: Duration::from_secs(5),
3828 lookback_delta: Duration::from_secs(1),
3829 };
3830
3831 let table_provider = build_test_table_provider(
3833 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3834 2,
3835 2,
3836 )
3837 .await;
3838 let plan =
3839 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3840 .await
3841 .unwrap();
3842 let expected_no_without = String::from(
3843 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3844 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3845 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3846 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3847 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3848 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3849 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3850 ).replace("TEMPLATE", plan_name);
3851 assert_eq!(
3852 plan.display_indent_schema().to_string(),
3853 expected_no_without
3854 );
3855
3856 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3858 *modifier = Some(LabelModifier::Exclude(Labels {
3859 labels: vec![String::from("tag_1")].into_iter().collect(),
3860 }));
3861 }
3862 let table_provider = build_test_table_provider(
3863 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3864 2,
3865 2,
3866 )
3867 .await;
3868 let plan =
3869 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3870 .await
3871 .unwrap();
3872 let expected_without = String::from(
3873 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3874 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3875 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3876 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3877 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3878 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3879 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3880 ).replace("TEMPLATE", plan_name);
3881 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3882 }
3883
3884 #[tokio::test]
3885 async fn aggregate_sum() {
3886 do_aggregate_expr_plan("sum", "sum").await;
3887 }
3888
3889 #[tokio::test]
3890 async fn aggregate_avg() {
3891 do_aggregate_expr_plan("avg", "avg").await;
3892 }
3893
3894 #[tokio::test]
3895 #[should_panic] async fn aggregate_count() {
3897 do_aggregate_expr_plan("count", "count").await;
3898 }
3899
3900 #[tokio::test]
3901 async fn aggregate_min() {
3902 do_aggregate_expr_plan("min", "min").await;
3903 }
3904
3905 #[tokio::test]
3906 async fn aggregate_max() {
3907 do_aggregate_expr_plan("max", "max").await;
3908 }
3909
3910 #[tokio::test]
3911 #[should_panic] async fn aggregate_group() {
3913 do_aggregate_expr_plan("grouping", "GROUPING").await;
3914 }
3915
3916 #[tokio::test]
3917 async fn aggregate_stddev() {
3918 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3919 }
3920
3921 #[tokio::test]
3922 async fn aggregate_stdvar() {
3923 do_aggregate_expr_plan("stdvar", "var_pop").await;
3924 }
3925
3926 #[tokio::test]
3950 async fn binary_op_column_column() {
3951 let prom_expr =
3952 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3953 let eval_stmt = EvalStmt {
3954 expr: prom_expr,
3955 start: UNIX_EPOCH,
3956 end: UNIX_EPOCH
3957 .checked_add(Duration::from_secs(100_000))
3958 .unwrap(),
3959 interval: Duration::from_secs(5),
3960 lookback_delta: Duration::from_secs(1),
3961 };
3962
3963 let table_provider = build_test_table_provider(
3964 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3965 1,
3966 1,
3967 )
3968 .await;
3969 let plan =
3970 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3971 .await
3972 .unwrap();
3973
3974 let expected = String::from(
3975 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3976 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3979 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3980 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3981 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3982 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3983 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3984 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3985 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3986 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3987 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3988 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3989 );
3990
3991 assert_eq!(plan.display_indent_schema().to_string(), expected);
3992 }
3993
3994 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3995 let prom_expr = parser::parse(query).unwrap();
3996 let eval_stmt = EvalStmt {
3997 expr: prom_expr,
3998 start: UNIX_EPOCH,
3999 end: UNIX_EPOCH
4000 .checked_add(Duration::from_secs(100_000))
4001 .unwrap(),
4002 interval: Duration::from_secs(5),
4003 lookback_delta: Duration::from_secs(1),
4004 };
4005
4006 let table_provider = build_test_table_provider(
4007 &[
4008 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4009 (
4010 "greptime_private".to_string(),
4011 "some_alt_metric".to_string(),
4012 ),
4013 ],
4014 1,
4015 1,
4016 )
4017 .await;
4018 let plan =
4019 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4020 .await
4021 .unwrap();
4022
4023 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
4024 }
4025
4026 #[tokio::test]
4027 async fn binary_op_literal_column() {
4028 let query = r#"1 + some_metric{tag_0="bar"}"#;
4029 let expected = String::from(
4030 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
4031 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4033 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4034 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4035 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4036 );
4037
4038 indie_query_plan_compare(query, expected).await;
4039 }
4040
4041 #[tokio::test]
4042 async fn binary_op_literal_literal() {
4043 let query = r#"1 + 1"#;
4044 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4045 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4046 indie_query_plan_compare(query, expected).await;
4047 }
4048
4049 #[tokio::test]
4050 async fn simple_bool_grammar() {
4051 let query = "some_metric != bool 1.2345";
4052 let expected = String::from(
4053 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
4054 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4059 );
4060
4061 indie_query_plan_compare(query, expected).await;
4062 }
4063
4064 #[tokio::test]
4065 async fn bool_with_additional_arithmetic() {
4066 let query = "some_metric + (1 == bool 2)";
4067 let expected = String::from(
4068 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4069 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4070 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4071 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4072 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4073 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4074 );
4075
4076 indie_query_plan_compare(query, expected).await;
4077 }
4078
4079 #[tokio::test]
4080 async fn simple_unary() {
4081 let query = "-some_metric";
4082 let expected = String::from(
4083 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4084 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4085 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4086 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4087 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4088 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4089 );
4090
4091 indie_query_plan_compare(query, expected).await;
4092 }
4093
4094 #[tokio::test]
4095 async fn increase_aggr() {
4096 let query = "increase(some_metric[5m])";
4097 let expected = String::from(
4098 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4099 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4100 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4101 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4102 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4103 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4104 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4105 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4106 );
4107
4108 indie_query_plan_compare(query, expected).await;
4109 }
4110
4111 #[tokio::test]
4112 async fn less_filter_on_value() {
4113 let query = "some_metric < 1.2345";
4114 let expected = String::from(
4115 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4116 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4117 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4118 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4119 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4120 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4121 );
4122
4123 indie_query_plan_compare(query, expected).await;
4124 }
4125
4126 #[tokio::test]
4127 async fn count_over_time() {
4128 let query = "count_over_time(some_metric[5m])";
4129 let expected = String::from(
4130 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4131 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4132 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4133 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4134 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4135 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4136 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4137 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4138 );
4139
4140 indie_query_plan_compare(query, expected).await;
4141 }
4142
4143 #[tokio::test]
4144 async fn test_hash_join() {
4145 let mut eval_stmt = EvalStmt {
4146 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4147 start: UNIX_EPOCH,
4148 end: UNIX_EPOCH
4149 .checked_add(Duration::from_secs(100_000))
4150 .unwrap(),
4151 interval: Duration::from_secs(5),
4152 lookback_delta: Duration::from_secs(1),
4153 };
4154
4155 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4156
4157 let prom_expr = parser::parse(case).unwrap();
4158 eval_stmt.expr = prom_expr;
4159 let table_provider = build_test_table_provider_with_fields(
4160 &[
4161 (
4162 DEFAULT_SCHEMA_NAME.to_string(),
4163 "http_server_requests_seconds_sum".to_string(),
4164 ),
4165 (
4166 DEFAULT_SCHEMA_NAME.to_string(),
4167 "http_server_requests_seconds_count".to_string(),
4168 ),
4169 ],
4170 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4171 )
4172 .await;
4173 let plan =
4175 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4176 .await
4177 .unwrap();
4178 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4179 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4180 \n SubqueryAlias: http_server_requests_seconds_sum\
4181 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4182 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4183 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4184 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4185 \n TableScan: http_server_requests_seconds_sum\
4186 \n SubqueryAlias: http_server_requests_seconds_count\
4187 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4188 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4189 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4190 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4191 \n TableScan: http_server_requests_seconds_count";
4192 assert_eq!(plan.to_string(), expected);
4193 }
4194
4195 #[tokio::test]
4196 async fn test_nested_histogram_quantile() {
4197 let mut eval_stmt = EvalStmt {
4198 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4199 start: UNIX_EPOCH,
4200 end: UNIX_EPOCH
4201 .checked_add(Duration::from_secs(100_000))
4202 .unwrap(),
4203 interval: Duration::from_secs(5),
4204 lookback_delta: Duration::from_secs(1),
4205 };
4206
4207 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4208
4209 let prom_expr = parser::parse(case).unwrap();
4210 eval_stmt.expr = prom_expr;
4211 let table_provider = build_test_table_provider_with_fields(
4212 &[(
4213 DEFAULT_SCHEMA_NAME.to_string(),
4214 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4215 )],
4216 &["pod", "le", "path", "code", "container"],
4217 )
4218 .await;
4219 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4221 .await
4222 .unwrap();
4223 }
4224
4225 #[tokio::test]
4226 async fn test_parse_and_operator() {
4227 let mut eval_stmt = EvalStmt {
4228 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4229 start: UNIX_EPOCH,
4230 end: UNIX_EPOCH
4231 .checked_add(Duration::from_secs(100_000))
4232 .unwrap(),
4233 interval: Duration::from_secs(5),
4234 lookback_delta: Duration::from_secs(1),
4235 };
4236
4237 let cases = [
4238 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4239 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4240 ];
4241
4242 for case in cases {
4243 let prom_expr = parser::parse(case).unwrap();
4244 eval_stmt.expr = prom_expr;
4245 let table_provider = build_test_table_provider_with_fields(
4246 &[
4247 (
4248 DEFAULT_SCHEMA_NAME.to_string(),
4249 "kubelet_volume_stats_used_bytes".to_string(),
4250 ),
4251 (
4252 DEFAULT_SCHEMA_NAME.to_string(),
4253 "kubelet_volume_stats_capacity_bytes".to_string(),
4254 ),
4255 ],
4256 &["namespace", "persistentvolumeclaim"],
4257 )
4258 .await;
4259 let _ =
4261 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4262 .await
4263 .unwrap();
4264 }
4265 }
4266
4267 #[tokio::test]
4268 async fn test_nested_binary_op() {
4269 let mut eval_stmt = EvalStmt {
4270 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4271 start: UNIX_EPOCH,
4272 end: UNIX_EPOCH
4273 .checked_add(Duration::from_secs(100_000))
4274 .unwrap(),
4275 interval: Duration::from_secs(5),
4276 lookback_delta: Duration::from_secs(1),
4277 };
4278
4279 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4280 (
4281 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4282 or
4283 vector(0)
4284 )"#;
4285
4286 let prom_expr = parser::parse(case).unwrap();
4287 eval_stmt.expr = prom_expr;
4288 let table_provider = build_test_table_provider_with_fields(
4289 &[(
4290 DEFAULT_SCHEMA_NAME.to_string(),
4291 "nginx_ingress_controller_requests".to_string(),
4292 )],
4293 &["namespace", "job"],
4294 )
4295 .await;
4296 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4298 .await
4299 .unwrap();
4300 }
4301
4302 #[tokio::test]
4303 async fn test_parse_or_operator() {
4304 let mut eval_stmt = EvalStmt {
4305 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4306 start: UNIX_EPOCH,
4307 end: UNIX_EPOCH
4308 .checked_add(Duration::from_secs(100_000))
4309 .unwrap(),
4310 interval: Duration::from_secs(5),
4311 lookback_delta: Duration::from_secs(1),
4312 };
4313
4314 let case = r#"
4315 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4316 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4317 or
4318 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4319 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4320
4321 let table_provider = build_test_table_provider_with_fields(
4322 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4323 &["tenant_name", "cluster_name"],
4324 )
4325 .await;
4326 eval_stmt.expr = parser::parse(case).unwrap();
4327 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4328 .await
4329 .unwrap();
4330
4331 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4332 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4333 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4334 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4335 or
4336 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4337 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4338 or
4339 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4340 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4341 let table_provider = build_test_table_provider_with_fields(
4342 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4343 &["tenant_name", "cluster_name"],
4344 )
4345 .await;
4346 eval_stmt.expr = parser::parse(case).unwrap();
4347 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4348 .await
4349 .unwrap();
4350
4351 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4352 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4353 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4354 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4355 let table_provider = build_test_table_provider_with_fields(
4356 &[
4357 (
4358 DEFAULT_SCHEMA_NAME.to_string(),
4359 "background_waitevent_cnt".to_string(),
4360 ),
4361 (
4362 DEFAULT_SCHEMA_NAME.to_string(),
4363 "foreground_waitevent_cnt".to_string(),
4364 ),
4365 ],
4366 &["tenant_name", "cluster_name"],
4367 )
4368 .await;
4369 eval_stmt.expr = parser::parse(case).unwrap();
4370 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4371 .await
4372 .unwrap();
4373
4374 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4375 let table_provider = build_test_table_provider_with_fields(
4376 &[
4377 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4378 (
4379 DEFAULT_SCHEMA_NAME.to_string(),
4380 "container_cpu_load_average_10s".to_string(),
4381 ),
4382 (
4383 DEFAULT_SCHEMA_NAME.to_string(),
4384 "container_spec_cpu_quota".to_string(),
4385 ),
4386 ],
4387 &["cluster_name", "host_name"],
4388 )
4389 .await;
4390 eval_stmt.expr = parser::parse(case).unwrap();
4391 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4392 .await
4393 .unwrap();
4394 }
4395
4396 #[tokio::test]
4397 async fn value_matcher() {
4398 let mut eval_stmt = EvalStmt {
4400 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4401 start: UNIX_EPOCH,
4402 end: UNIX_EPOCH
4403 .checked_add(Duration::from_secs(100_000))
4404 .unwrap(),
4405 interval: Duration::from_secs(5),
4406 lookback_delta: Duration::from_secs(1),
4407 };
4408
4409 let cases = [
4410 (
4412 r#"some_metric{__field__="field_1"}"#,
4413 vec![
4414 "some_metric.field_1",
4415 "some_metric.tag_0",
4416 "some_metric.tag_1",
4417 "some_metric.tag_2",
4418 "some_metric.timestamp",
4419 ],
4420 ),
4421 (
4423 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4424 vec![
4425 "some_metric.field_0",
4426 "some_metric.field_1",
4427 "some_metric.tag_0",
4428 "some_metric.tag_1",
4429 "some_metric.tag_2",
4430 "some_metric.timestamp",
4431 ],
4432 ),
4433 (
4435 r#"some_metric{__field__!="field_1"}"#,
4436 vec![
4437 "some_metric.field_0",
4438 "some_metric.field_2",
4439 "some_metric.tag_0",
4440 "some_metric.tag_1",
4441 "some_metric.tag_2",
4442 "some_metric.timestamp",
4443 ],
4444 ),
4445 (
4447 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4448 vec![
4449 "some_metric.field_0",
4450 "some_metric.tag_0",
4451 "some_metric.tag_1",
4452 "some_metric.tag_2",
4453 "some_metric.timestamp",
4454 ],
4455 ),
4456 (
4458 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4459 vec![
4460 "some_metric.field_1",
4461 "some_metric.tag_0",
4462 "some_metric.tag_1",
4463 "some_metric.tag_2",
4464 "some_metric.timestamp",
4465 ],
4466 ),
4467 (
4469 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4470 vec![
4471 "some_metric.tag_0",
4472 "some_metric.tag_1",
4473 "some_metric.tag_2",
4474 "some_metric.timestamp",
4475 ],
4476 ),
4477 (
4479 r#"some_metric{__field__=~"field_1|field_2"}"#,
4480 vec![
4481 "some_metric.field_1",
4482 "some_metric.field_2",
4483 "some_metric.tag_0",
4484 "some_metric.tag_1",
4485 "some_metric.tag_2",
4486 "some_metric.timestamp",
4487 ],
4488 ),
4489 (
4491 r#"some_metric{__field__!~"field_1|field_2"}"#,
4492 vec![
4493 "some_metric.field_0",
4494 "some_metric.tag_0",
4495 "some_metric.tag_1",
4496 "some_metric.tag_2",
4497 "some_metric.timestamp",
4498 ],
4499 ),
4500 ];
4501
4502 for case in cases {
4503 let prom_expr = parser::parse(case.0).unwrap();
4504 eval_stmt.expr = prom_expr;
4505 let table_provider = build_test_table_provider(
4506 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4507 3,
4508 3,
4509 )
4510 .await;
4511 let plan =
4512 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4513 .await
4514 .unwrap();
4515 let mut fields = plan.schema().field_names();
4516 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4517 fields.sort();
4518 expected.sort();
4519 assert_eq!(fields, expected, "case: {:?}", case.0);
4520 }
4521
4522 let bad_cases = [
4523 r#"some_metric{__field__="nonexistent"}"#,
4524 r#"some_metric{__field__!="nonexistent"}"#,
4525 ];
4526
4527 for case in bad_cases {
4528 let prom_expr = parser::parse(case).unwrap();
4529 eval_stmt.expr = prom_expr;
4530 let table_provider = build_test_table_provider(
4531 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4532 3,
4533 3,
4534 )
4535 .await;
4536 let plan =
4537 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4538 .await;
4539 assert!(plan.is_err(), "case: {:?}", case);
4540 }
4541 }
4542
4543 #[tokio::test]
4544 async fn custom_schema() {
4545 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4546 let expected = String::from(
4547 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4548 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4549 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4550 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4551 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4552 );
4553
4554 indie_query_plan_compare(query, expected).await;
4555
4556 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4557 let expected = String::from(
4558 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4559 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4560 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4561 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4562 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4563 );
4564
4565 indie_query_plan_compare(query, expected).await;
4566
4567 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4568 let expected = String::from(
4569 "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4570 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4571 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4572 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4573 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4574 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4575 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4576 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4577 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4578 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4579 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4580 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4581 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4582 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4583 );
4584
4585 indie_query_plan_compare(query, expected).await;
4586 }
4587
4588 #[tokio::test]
4589 async fn only_equals_is_supported_for_special_matcher() {
4590 let queries = &[
4591 "some_alt_metric{__schema__!=\"greptime_private\"}",
4592 "some_alt_metric{__schema__=~\"lalala\"}",
4593 "some_alt_metric{__database__!=\"greptime_private\"}",
4594 "some_alt_metric{__database__=~\"lalala\"}",
4595 ];
4596
4597 for query in queries {
4598 let prom_expr = parser::parse(query).unwrap();
4599 let eval_stmt = EvalStmt {
4600 expr: prom_expr,
4601 start: UNIX_EPOCH,
4602 end: UNIX_EPOCH
4603 .checked_add(Duration::from_secs(100_000))
4604 .unwrap(),
4605 interval: Duration::from_secs(5),
4606 lookback_delta: Duration::from_secs(1),
4607 };
4608
4609 let table_provider = build_test_table_provider(
4610 &[
4611 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4612 (
4613 "greptime_private".to_string(),
4614 "some_alt_metric".to_string(),
4615 ),
4616 ],
4617 1,
4618 1,
4619 )
4620 .await;
4621
4622 let plan =
4623 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4624 .await;
4625 assert!(plan.is_err(), "query: {:?}", query);
4626 }
4627 }
4628
4629 #[tokio::test]
4630 async fn test_non_ms_precision() {
4631 let catalog_list = MemoryCatalogManager::with_default_setup();
4632 let columns = vec![
4633 ColumnSchema::new(
4634 "tag".to_string(),
4635 ConcreteDataType::string_datatype(),
4636 false,
4637 ),
4638 ColumnSchema::new(
4639 "timestamp".to_string(),
4640 ConcreteDataType::timestamp_nanosecond_datatype(),
4641 false,
4642 )
4643 .with_time_index(true),
4644 ColumnSchema::new(
4645 "field".to_string(),
4646 ConcreteDataType::float64_datatype(),
4647 true,
4648 ),
4649 ];
4650 let schema = Arc::new(Schema::new(columns));
4651 let table_meta = TableMetaBuilder::empty()
4652 .schema(schema)
4653 .primary_key_indices(vec![0])
4654 .value_indices(vec![2])
4655 .next_column_id(1024)
4656 .build()
4657 .unwrap();
4658 let table_info = TableInfoBuilder::default()
4659 .name("metrics".to_string())
4660 .meta(table_meta)
4661 .build()
4662 .unwrap();
4663 let table = EmptyTable::from_table_info(&table_info);
4664 assert!(
4665 catalog_list
4666 .register_table_sync(RegisterTableRequest {
4667 catalog: DEFAULT_CATALOG_NAME.to_string(),
4668 schema: DEFAULT_SCHEMA_NAME.to_string(),
4669 table_name: "metrics".to_string(),
4670 table_id: 1024,
4671 table,
4672 })
4673 .is_ok()
4674 );
4675
4676 let plan = PromPlanner::stmt_to_plan(
4677 DfTableSourceProvider::new(
4678 catalog_list.clone(),
4679 false,
4680 QueryContext::arc(),
4681 DummyDecoder::arc(),
4682 true,
4683 ),
4684 &EvalStmt {
4685 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4686 start: UNIX_EPOCH,
4687 end: UNIX_EPOCH
4688 .checked_add(Duration::from_secs(100_000))
4689 .unwrap(),
4690 interval: Duration::from_secs(5),
4691 lookback_delta: Duration::from_secs(1),
4692 },
4693 &build_query_engine_state(),
4694 )
4695 .await
4696 .unwrap();
4697 assert_eq!(
4698 plan.display_indent_schema().to_string(),
4699 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4700 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4701 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4702 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4703 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4704 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4705 );
4706 let plan = PromPlanner::stmt_to_plan(
4707 DfTableSourceProvider::new(
4708 catalog_list.clone(),
4709 false,
4710 QueryContext::arc(),
4711 DummyDecoder::arc(),
4712 true,
4713 ),
4714 &EvalStmt {
4715 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4716 start: UNIX_EPOCH,
4717 end: UNIX_EPOCH
4718 .checked_add(Duration::from_secs(100_000))
4719 .unwrap(),
4720 interval: Duration::from_secs(5),
4721 lookback_delta: Duration::from_secs(1),
4722 },
4723 &build_query_engine_state(),
4724 )
4725 .await
4726 .unwrap();
4727 assert_eq!(
4728 plan.display_indent_schema().to_string(),
4729 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4730 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4731 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4732 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4733 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4734 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4735 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4736 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4737 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4738 );
4739 }
4740
4741 #[tokio::test]
4742 async fn test_nonexistent_label() {
4743 let mut eval_stmt = EvalStmt {
4745 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4746 start: UNIX_EPOCH,
4747 end: UNIX_EPOCH
4748 .checked_add(Duration::from_secs(100_000))
4749 .unwrap(),
4750 interval: Duration::from_secs(5),
4751 lookback_delta: Duration::from_secs(1),
4752 };
4753
4754 let case = r#"some_metric{nonexistent="hi"}"#;
4755 let prom_expr = parser::parse(case).unwrap();
4756 eval_stmt.expr = prom_expr;
4757 let table_provider = build_test_table_provider(
4758 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4759 3,
4760 3,
4761 )
4762 .await;
4763 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4765 .await
4766 .unwrap();
4767 }
4768
4769 #[tokio::test]
4770 async fn test_label_join() {
4771 let prom_expr = parser::parse(
4772 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4773 )
4774 .unwrap();
4775 let eval_stmt = EvalStmt {
4776 expr: prom_expr,
4777 start: UNIX_EPOCH,
4778 end: UNIX_EPOCH
4779 .checked_add(Duration::from_secs(100_000))
4780 .unwrap(),
4781 interval: Duration::from_secs(5),
4782 lookback_delta: Duration::from_secs(1),
4783 };
4784
4785 let table_provider =
4786 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4787 .await;
4788 let plan =
4789 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4790 .await
4791 .unwrap();
4792
4793 let expected = r#"
4794Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4795 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4796 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4797 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4798 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4799 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4800 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4801
4802 let ret = plan.display_indent_schema().to_string();
4803 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4804 }
4805
4806 #[tokio::test]
4807 async fn test_label_replace() {
4808 let prom_expr = parser::parse(
4809 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4810 )
4811 .unwrap();
4812 let eval_stmt = EvalStmt {
4813 expr: prom_expr,
4814 start: UNIX_EPOCH,
4815 end: UNIX_EPOCH
4816 .checked_add(Duration::from_secs(100_000))
4817 .unwrap(),
4818 interval: Duration::from_secs(5),
4819 lookback_delta: Duration::from_secs(1),
4820 };
4821
4822 let table_provider =
4823 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4824 .await;
4825 let plan =
4826 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4827 .await
4828 .unwrap();
4829
4830 let expected = r#"
4831Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4832 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4833 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4834 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4835 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4836 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4837 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4838
4839 let ret = plan.display_indent_schema().to_string();
4840 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4841 }
4842
4843 #[tokio::test]
4844 async fn test_matchers_to_expr() {
4845 let mut eval_stmt = EvalStmt {
4846 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4847 start: UNIX_EPOCH,
4848 end: UNIX_EPOCH
4849 .checked_add(Duration::from_secs(100_000))
4850 .unwrap(),
4851 interval: Duration::from_secs(5),
4852 lookback_delta: Duration::from_secs(1),
4853 };
4854 let case =
4855 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4856
4857 let prom_expr = parser::parse(case).unwrap();
4858 eval_stmt.expr = prom_expr;
4859 let table_provider = build_test_table_provider(
4860 &[(
4861 DEFAULT_SCHEMA_NAME.to_string(),
4862 "prometheus_tsdb_head_series".to_string(),
4863 )],
4864 3,
4865 3,
4866 )
4867 .await;
4868 let plan =
4869 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4870 .await
4871 .unwrap();
4872 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4873 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4874 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4875 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4876 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4877 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4878 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4879 assert_eq!(plan.display_indent_schema().to_string(), expected);
4880 }
4881
4882 #[tokio::test]
4883 async fn test_topk_expr() {
4884 let mut eval_stmt = EvalStmt {
4885 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4886 start: UNIX_EPOCH,
4887 end: UNIX_EPOCH
4888 .checked_add(Duration::from_secs(100_000))
4889 .unwrap(),
4890 interval: Duration::from_secs(5),
4891 lookback_delta: Duration::from_secs(1),
4892 };
4893 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4894
4895 let prom_expr = parser::parse(case).unwrap();
4896 eval_stmt.expr = prom_expr;
4897 let table_provider = build_test_table_provider_with_fields(
4898 &[
4899 (
4900 DEFAULT_SCHEMA_NAME.to_string(),
4901 "prometheus_tsdb_head_series".to_string(),
4902 ),
4903 (
4904 DEFAULT_SCHEMA_NAME.to_string(),
4905 "http_server_requests_seconds_count".to_string(),
4906 ),
4907 ],
4908 &["ip"],
4909 )
4910 .await;
4911
4912 let plan =
4913 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4914 .await
4915 .unwrap();
4916 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4917 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4918 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4919 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4920 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4921 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4922 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4923 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4924 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4925 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4926 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4927
4928 assert_eq!(plan.display_indent_schema().to_string(), expected);
4929 }
4930
4931 #[tokio::test]
4932 async fn test_count_values_expr() {
4933 let mut eval_stmt = EvalStmt {
4934 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4935 start: UNIX_EPOCH,
4936 end: UNIX_EPOCH
4937 .checked_add(Duration::from_secs(100_000))
4938 .unwrap(),
4939 interval: Duration::from_secs(5),
4940 lookback_delta: Duration::from_secs(1),
4941 };
4942 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4943
4944 let prom_expr = parser::parse(case).unwrap();
4945 eval_stmt.expr = prom_expr;
4946 let table_provider = build_test_table_provider_with_fields(
4947 &[
4948 (
4949 DEFAULT_SCHEMA_NAME.to_string(),
4950 "prometheus_tsdb_head_series".to_string(),
4951 ),
4952 (
4953 DEFAULT_SCHEMA_NAME.to_string(),
4954 "http_server_requests_seconds_count".to_string(),
4955 ),
4956 ],
4957 &["ip"],
4958 )
4959 .await;
4960
4961 let plan =
4962 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4963 .await
4964 .unwrap();
4965 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4966 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4967 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4968 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4969 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4970 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4971 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4972 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4973 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4974
4975 assert_eq!(plan.display_indent_schema().to_string(), expected);
4976 }
4977
4978 #[tokio::test]
4979 async fn test_value_alias() {
4980 let mut eval_stmt = EvalStmt {
4981 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4982 start: UNIX_EPOCH,
4983 end: UNIX_EPOCH
4984 .checked_add(Duration::from_secs(100_000))
4985 .unwrap(),
4986 interval: Duration::from_secs(5),
4987 lookback_delta: Duration::from_secs(1),
4988 };
4989 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4990
4991 let prom_expr = parser::parse(case).unwrap();
4992 eval_stmt.expr = prom_expr;
4993 let table_provider = build_test_table_provider_with_fields(
4994 &[
4995 (
4996 DEFAULT_SCHEMA_NAME.to_string(),
4997 "prometheus_tsdb_head_series".to_string(),
4998 ),
4999 (
5000 DEFAULT_SCHEMA_NAME.to_string(),
5001 "http_server_requests_seconds_count".to_string(),
5002 ),
5003 ],
5004 &["ip"],
5005 )
5006 .await;
5007
5008 let alias = Some("my_series".to_string());
5009 let plan = PromPlanner::stmt_to_plan_with_alias(
5010 table_provider,
5011 &eval_stmt,
5012 alias,
5013 &build_query_engine_state(),
5014 )
5015 .await
5016 .unwrap();
5017 let expected = r#"
5018Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
5019 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
5020 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5021 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5022 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
5023 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5024 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5025 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5026 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5027 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
5028 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
5029 }
5030
5031 #[tokio::test]
5032 async fn test_quantile_expr() {
5033 let mut eval_stmt = EvalStmt {
5034 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5035 start: UNIX_EPOCH,
5036 end: UNIX_EPOCH
5037 .checked_add(Duration::from_secs(100_000))
5038 .unwrap(),
5039 interval: Duration::from_secs(5),
5040 lookback_delta: Duration::from_secs(1),
5041 };
5042 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5043
5044 let prom_expr = parser::parse(case).unwrap();
5045 eval_stmt.expr = prom_expr;
5046 let table_provider = build_test_table_provider_with_fields(
5047 &[
5048 (
5049 DEFAULT_SCHEMA_NAME.to_string(),
5050 "prometheus_tsdb_head_series".to_string(),
5051 ),
5052 (
5053 DEFAULT_SCHEMA_NAME.to_string(),
5054 "http_server_requests_seconds_count".to_string(),
5055 ),
5056 ],
5057 &["ip"],
5058 )
5059 .await;
5060
5061 let plan =
5062 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5063 .await
5064 .unwrap();
5065 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5066 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5067 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5068 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5069 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5070 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5071 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5072 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5073 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5074
5075 assert_eq!(plan.display_indent_schema().to_string(), expected);
5076 }
5077
5078 #[tokio::test]
5079 async fn test_or_not_exists_table_label() {
5080 let mut eval_stmt = EvalStmt {
5081 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5082 start: UNIX_EPOCH,
5083 end: UNIX_EPOCH
5084 .checked_add(Duration::from_secs(100_000))
5085 .unwrap(),
5086 interval: Duration::from_secs(5),
5087 lookback_delta: Duration::from_secs(1),
5088 };
5089 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
5090
5091 let prom_expr = parser::parse(case).unwrap();
5092 eval_stmt.expr = prom_expr;
5093 let table_provider = build_test_table_provider_with_fields(
5094 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
5095 &["job"],
5096 )
5097 .await;
5098
5099 let plan =
5100 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5101 .await
5102 .unwrap();
5103 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5104 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5105 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5106 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5107 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5108 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5109 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5110 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5111 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5112 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5113 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5114 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5115 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5116 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5117 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5118 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5119
5120 assert_eq!(plan.display_indent_schema().to_string(), expected);
5121 }
5122
5123 #[tokio::test]
5124 async fn test_histogram_quantile_missing_le_column() {
5125 let mut eval_stmt = EvalStmt {
5126 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5127 start: UNIX_EPOCH,
5128 end: UNIX_EPOCH
5129 .checked_add(Duration::from_secs(100_000))
5130 .unwrap(),
5131 interval: Duration::from_secs(5),
5132 lookback_delta: Duration::from_secs(1),
5133 };
5134
5135 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5137
5138 let prom_expr = parser::parse(case).unwrap();
5139 eval_stmt.expr = prom_expr;
5140
5141 let table_provider = build_test_table_provider_with_fields(
5143 &[(
5144 DEFAULT_SCHEMA_NAME.to_string(),
5145 "non_existent_histogram_bucket".to_string(),
5146 )],
5147 &["pod", "instance"], )
5149 .await;
5150
5151 let result =
5153 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5154 .await;
5155
5156 assert!(
5158 result.is_ok(),
5159 "Expected successful plan creation with empty result, but got error: {:?}",
5160 result.err()
5161 );
5162
5163 let plan = result.unwrap();
5165 match plan {
5166 LogicalPlan::EmptyRelation(_) => {
5167 }
5169 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5170 }
5171 }
5172}