1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::prelude::GREPTIME_VALUE;
25use datafusion::common::DFSchemaRef;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::execution::context::SessionState;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::grouping::grouping_udaf;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::DFSchema;
47use datafusion_expr::utils::conjunction;
48use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
49use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
50use datatypes::data_type::ConcreteDataType;
51use itertools::Itertools;
52use promql::extension_plan::{
53 build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
54 RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
55};
56use promql::functions::{
57 quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
58 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
59 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
60};
61use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
62use promql_parser::parser::token::TokenType;
63use promql_parser::parser::{
64 token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
65 Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
66 NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
67 VectorMatchCardinality, VectorSelector,
68};
69use snafu::{ensure, OptionExt, ResultExt};
70use store_api::metric_engine_consts::{
71 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
72};
73use table::table::adapter::DfTableProviderAdapter;
74
75use crate::promql::error::{
76 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
77 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
78 MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
79 NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
80 TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
81 UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
82 ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
83};
84
85const SPECIAL_TIME_FUNCTION: &str = "time";
87const SCALAR_FUNCTION: &str = "scalar";
89const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
91const SPECIAL_VECTOR_FUNCTION: &str = "vector";
93const LE_COLUMN_NAME: &str = "le";
95
96const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
97
98const DEFAULT_FIELD_COLUMN: &str = "value";
100
101const FIELD_COLUMN_MATCHER: &str = "__field__";
103
104const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
106const DB_COLUMN_MATCHER: &str = "__database__";
107
108const MAX_SCATTER_POINTS: i64 = 400;
110
111const INTERVAL_1H: i64 = 60 * 60 * 1000;
113
114#[derive(Default, Debug, Clone)]
115struct PromPlannerContext {
116 start: Millisecond,
118 end: Millisecond,
119 interval: Millisecond,
120 lookback_delta: Millisecond,
121
122 table_name: Option<String>,
124 time_index_column: Option<String>,
125 field_columns: Vec<String>,
126 tag_columns: Vec<String>,
127 field_column_matcher: Option<Vec<Matcher>>,
128 schema_name: Option<String>,
129 range: Option<Millisecond>,
131}
132
133impl PromPlannerContext {
134 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
135 Self {
136 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
137 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
138 interval: stmt.interval.as_millis() as _,
139 lookback_delta: stmt.lookback_delta.as_millis() as _,
140 ..Default::default()
141 }
142 }
143
144 fn reset(&mut self) {
146 self.table_name = None;
147 self.time_index_column = None;
148 self.field_columns = vec![];
149 self.tag_columns = vec![];
150 self.field_column_matcher = None;
151 self.schema_name = None;
152 self.range = None;
153 }
154
155 fn reset_table_name_and_schema(&mut self) {
157 self.table_name = Some(String::new());
158 self.schema_name = None;
159 }
160
161 fn has_le_tag(&self) -> bool {
163 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
164 }
165}
166
167pub struct PromPlanner {
168 table_provider: DfTableSourceProvider,
169 ctx: PromPlannerContext,
170}
171
172pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
174 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
175 matcher.value = unescaped_value;
176 }
177 matcher
178}
179
180impl PromPlanner {
181 pub async fn stmt_to_plan(
182 table_provider: DfTableSourceProvider,
183 stmt: &EvalStmt,
184 session_state: &SessionState,
185 ) -> Result<LogicalPlan> {
186 let mut planner = Self {
187 table_provider,
188 ctx: PromPlannerContext::from_eval_stmt(stmt),
189 };
190
191 planner.prom_expr_to_plan(&stmt.expr, session_state).await
192 }
193
194 #[async_recursion]
195 pub async fn prom_expr_to_plan(
196 &mut self,
197 prom_expr: &PromExpr,
198 session_state: &SessionState,
199 ) -> Result<LogicalPlan> {
200 let res = match prom_expr {
201 PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
202 PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
203 PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
204 PromExpr::Paren(ParenExpr { expr }) => {
205 self.prom_expr_to_plan(expr, session_state).await?
206 }
207 PromExpr::Subquery(expr) => {
208 self.prom_subquery_expr_to_plan(session_state, expr).await?
209 }
210 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
211 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
212 PromExpr::VectorSelector(selector) => {
213 self.prom_vector_selector_to_plan(selector).await?
214 }
215 PromExpr::MatrixSelector(selector) => {
216 self.prom_matrix_selector_to_plan(selector).await?
217 }
218 PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
219 PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
220 };
221
222 Ok(res)
223 }
224
225 async fn prom_subquery_expr_to_plan(
226 &mut self,
227 session_state: &SessionState,
228 subquery_expr: &SubqueryExpr,
229 ) -> Result<LogicalPlan> {
230 let SubqueryExpr {
231 expr, range, step, ..
232 } = subquery_expr;
233
234 let current_interval = self.ctx.interval;
235 if let Some(step) = step {
236 self.ctx.interval = step.as_millis() as _;
237 }
238 let current_start = self.ctx.start;
239 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
240 let input = self.prom_expr_to_plan(expr, session_state).await?;
241 self.ctx.interval = current_interval;
242 self.ctx.start = current_start;
243
244 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
245 let range_ms = range.as_millis() as _;
246 self.ctx.range = Some(range_ms);
247
248 let manipulate = RangeManipulate::new(
249 self.ctx.start,
250 self.ctx.end,
251 self.ctx.interval,
252 range_ms,
253 self.ctx
254 .time_index_column
255 .clone()
256 .expect("time index should be set in `setup_context`"),
257 self.ctx.field_columns.clone(),
258 input,
259 )
260 .context(DataFusionPlanningSnafu)?;
261
262 Ok(LogicalPlan::Extension(Extension {
263 node: Arc::new(manipulate),
264 }))
265 }
266
267 async fn prom_aggr_expr_to_plan(
268 &mut self,
269 session_state: &SessionState,
270 aggr_expr: &AggregateExpr,
271 ) -> Result<LogicalPlan> {
272 let AggregateExpr {
273 op,
274 expr,
275 modifier,
276 param,
277 } = aggr_expr;
278
279 let input = self.prom_expr_to_plan(expr, session_state).await?;
280
281 match (*op).id() {
282 token::T_TOPK | token::T_BOTTOMK => {
283 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
284 }
285 _ => {
286 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
289 let (aggr_exprs, prev_field_exprs) =
291 self.create_aggregate_exprs(*op, param, &input)?;
292
293 let builder = LogicalPlanBuilder::from(input);
295 let builder = if op.id() == token::T_COUNT_VALUES {
296 let label = Self::get_param_value_as_str(*op, param)?;
297 group_exprs.extend(prev_field_exprs.clone());
300 let project_fields = self
301 .create_field_column_exprs()?
302 .into_iter()
303 .chain(self.create_tag_column_exprs()?)
304 .chain(Some(self.create_time_index_column_expr()?))
305 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
306
307 builder
308 .aggregate(group_exprs.clone(), aggr_exprs)
309 .context(DataFusionPlanningSnafu)?
310 .project(project_fields)
311 .context(DataFusionPlanningSnafu)?
312 } else {
313 builder
314 .aggregate(group_exprs.clone(), aggr_exprs)
315 .context(DataFusionPlanningSnafu)?
316 };
317
318 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
319
320 builder
321 .sort(sort_expr)
322 .context(DataFusionPlanningSnafu)?
323 .build()
324 .context(DataFusionPlanningSnafu)
325 }
326 }
327 }
328
329 async fn prom_topk_bottomk_to_plan(
331 &mut self,
332 aggr_expr: &AggregateExpr,
333 input: LogicalPlan,
334 ) -> Result<LogicalPlan> {
335 let AggregateExpr {
336 op,
337 param,
338 modifier,
339 ..
340 } = aggr_expr;
341
342 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
343
344 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
345
346 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
348
349 let rank_columns: Vec<_> = window_exprs
350 .iter()
351 .map(|expr| expr.schema_name().to_string())
352 .collect();
353
354 let filter: DfExpr = rank_columns
357 .iter()
358 .fold(None, |expr, rank| {
359 let predicate = DfExpr::BinaryExpr(BinaryExpr {
360 left: Box::new(col(rank)),
361 op: Operator::LtEq,
362 right: Box::new(val.clone()),
363 });
364
365 match expr {
366 None => Some(predicate),
367 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
368 left: Box::new(expr),
369 op: Operator::Or,
370 right: Box::new(predicate),
371 })),
372 }
373 })
374 .unwrap();
375
376 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
377
378 let mut new_group_exprs = group_exprs.clone();
379 new_group_exprs.extend(rank_columns);
381
382 let group_sort_expr = new_group_exprs
383 .into_iter()
384 .map(|expr| expr.sort(true, false));
385
386 let project_fields = self
387 .create_field_column_exprs()?
388 .into_iter()
389 .chain(self.create_tag_column_exprs()?)
390 .chain(Some(self.create_time_index_column_expr()?));
391
392 LogicalPlanBuilder::from(input)
393 .window(window_exprs)
394 .context(DataFusionPlanningSnafu)?
395 .filter(filter)
396 .context(DataFusionPlanningSnafu)?
397 .sort(group_sort_expr)
398 .context(DataFusionPlanningSnafu)?
399 .project(project_fields)
400 .context(DataFusionPlanningSnafu)?
401 .build()
402 .context(DataFusionPlanningSnafu)
403 }
404
405 async fn prom_unary_expr_to_plan(
406 &mut self,
407 session_state: &SessionState,
408 unary_expr: &UnaryExpr,
409 ) -> Result<LogicalPlan> {
410 let UnaryExpr { expr } = unary_expr;
411 let input = self.prom_expr_to_plan(expr, session_state).await?;
413 self.projection_for_each_field_column(input, |col| {
414 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
415 })
416 }
417
418 async fn prom_binary_expr_to_plan(
419 &mut self,
420 session_state: &SessionState,
421 binary_expr: &PromBinaryExpr,
422 ) -> Result<LogicalPlan> {
423 let PromBinaryExpr {
424 lhs,
425 rhs,
426 op,
427 modifier,
428 } = binary_expr;
429
430 let should_return_bool = if let Some(m) = modifier {
433 m.return_bool
434 } else {
435 false
436 };
437 let is_comparison_op = Self::is_token_a_comparison_op(*op);
438
439 match (
442 Self::try_build_literal_expr(lhs),
443 Self::try_build_literal_expr(rhs),
444 ) {
445 (Some(lhs), Some(rhs)) => {
446 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
447 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
448 self.ctx.reset_table_name_and_schema();
449 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
450 let mut field_expr = field_expr_builder(lhs, rhs)?;
451
452 if is_comparison_op && should_return_bool {
453 field_expr = DfExpr::Cast(Cast {
454 expr: Box::new(field_expr),
455 data_type: ArrowDataType::Float64,
456 });
457 }
458
459 Ok(LogicalPlan::Extension(Extension {
460 node: Arc::new(
461 EmptyMetric::new(
462 self.ctx.start,
463 self.ctx.end,
464 self.ctx.interval,
465 SPECIAL_TIME_FUNCTION.to_string(),
466 DEFAULT_FIELD_COLUMN.to_string(),
467 Some(field_expr),
468 )
469 .context(DataFusionPlanningSnafu)?,
470 ),
471 }))
472 }
473 (Some(mut expr), None) => {
475 let input = self.prom_expr_to_plan(rhs, session_state).await?;
476 if let Some(time_expr) = Self::try_build_special_time_expr(
478 lhs,
479 self.ctx.time_index_column.as_ref().unwrap(),
480 ) {
481 expr = time_expr
482 }
483 let bin_expr_builder = |col: &String| {
484 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
485 let mut binary_expr =
486 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
487
488 if is_comparison_op && should_return_bool {
489 binary_expr = DfExpr::Cast(Cast {
490 expr: Box::new(binary_expr),
491 data_type: ArrowDataType::Float64,
492 });
493 }
494 Ok(binary_expr)
495 };
496 if is_comparison_op && !should_return_bool {
497 self.filter_on_field_column(input, bin_expr_builder)
498 } else {
499 self.projection_for_each_field_column(input, bin_expr_builder)
500 }
501 }
502 (None, Some(mut expr)) => {
504 let input = self.prom_expr_to_plan(lhs, session_state).await?;
505 if let Some(time_expr) = Self::try_build_special_time_expr(
507 rhs,
508 self.ctx.time_index_column.as_ref().unwrap(),
509 ) {
510 expr = time_expr
511 }
512 let bin_expr_builder = |col: &String| {
513 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
514 let mut binary_expr =
515 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
516
517 if is_comparison_op && should_return_bool {
518 binary_expr = DfExpr::Cast(Cast {
519 expr: Box::new(binary_expr),
520 data_type: ArrowDataType::Float64,
521 });
522 }
523 Ok(binary_expr)
524 };
525 if is_comparison_op && !should_return_bool {
526 self.filter_on_field_column(input, bin_expr_builder)
527 } else {
528 self.projection_for_each_field_column(input, bin_expr_builder)
529 }
530 }
531 (None, None) => {
533 let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
534 let left_field_columns = self.ctx.field_columns.clone();
535 let left_time_index_column = self.ctx.time_index_column.clone();
536 let mut left_table_ref = self
537 .table_ref()
538 .unwrap_or_else(|_| TableReference::bare(""));
539 let left_context = self.ctx.clone();
540
541 let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
542 let right_field_columns = self.ctx.field_columns.clone();
543 let right_time_index_column = self.ctx.time_index_column.clone();
544 let mut right_table_ref = self
545 .table_ref()
546 .unwrap_or_else(|_| TableReference::bare(""));
547 let right_context = self.ctx.clone();
548
549 if Self::is_token_a_set_op(*op) {
553 return self.set_op_on_non_field_columns(
554 left_input,
555 right_input,
556 left_context,
557 right_context,
558 *op,
559 modifier,
560 );
561 }
562
563 if left_table_ref == right_table_ref {
565 left_table_ref = TableReference::bare("lhs");
567 right_table_ref = TableReference::bare("rhs");
568 if self.ctx.tag_columns.is_empty() {
574 self.ctx = left_context.clone();
575 self.ctx.table_name = Some("lhs".to_string());
576 } else {
577 self.ctx.table_name = Some("rhs".to_string());
578 }
579 }
580 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
581
582 let join_plan = self.join_on_non_field_columns(
583 left_input,
584 right_input,
585 left_table_ref.clone(),
586 right_table_ref.clone(),
587 left_time_index_column,
588 right_time_index_column,
589 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
592 modifier,
593 )?;
594 let join_plan_schema = join_plan.schema().clone();
595
596 let bin_expr_builder = |_: &String| {
597 let (left_col_name, right_col_name) = field_columns.next().unwrap();
598 let left_col = join_plan_schema
599 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
600 .context(DataFusionPlanningSnafu)?
601 .into();
602 let right_col = join_plan_schema
603 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
604 .context(DataFusionPlanningSnafu)?
605 .into();
606
607 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
608 let mut binary_expr =
609 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
610 if is_comparison_op && should_return_bool {
611 binary_expr = DfExpr::Cast(Cast {
612 expr: Box::new(binary_expr),
613 data_type: ArrowDataType::Float64,
614 });
615 }
616 Ok(binary_expr)
617 };
618 if is_comparison_op && !should_return_bool {
619 self.filter_on_field_column(join_plan, bin_expr_builder)
620 } else {
621 self.projection_for_each_field_column(join_plan, bin_expr_builder)
622 }
623 }
624 }
625 }
626
627 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
628 let NumberLiteral { val } = number_literal;
629 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
630 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
631 self.ctx.reset_table_name_and_schema();
632 let literal_expr = df_prelude::lit(*val);
633
634 let plan = LogicalPlan::Extension(Extension {
635 node: Arc::new(
636 EmptyMetric::new(
637 self.ctx.start,
638 self.ctx.end,
639 self.ctx.interval,
640 SPECIAL_TIME_FUNCTION.to_string(),
641 DEFAULT_FIELD_COLUMN.to_string(),
642 Some(literal_expr),
643 )
644 .context(DataFusionPlanningSnafu)?,
645 ),
646 });
647 Ok(plan)
648 }
649
650 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
651 let StringLiteral { val } = string_literal;
652 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
653 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
654 self.ctx.reset_table_name_and_schema();
655 let literal_expr = df_prelude::lit(val.to_string());
656
657 let plan = LogicalPlan::Extension(Extension {
658 node: Arc::new(
659 EmptyMetric::new(
660 self.ctx.start,
661 self.ctx.end,
662 self.ctx.interval,
663 SPECIAL_TIME_FUNCTION.to_string(),
664 DEFAULT_FIELD_COLUMN.to_string(),
665 Some(literal_expr),
666 )
667 .context(DataFusionPlanningSnafu)?,
668 ),
669 });
670 Ok(plan)
671 }
672
673 async fn prom_vector_selector_to_plan(
674 &mut self,
675 vector_selector: &VectorSelector,
676 ) -> Result<LogicalPlan> {
677 let VectorSelector {
678 name,
679 offset,
680 matchers,
681 at: _,
682 } = vector_selector;
683 let matchers = self.preprocess_label_matchers(matchers, name)?;
684 if let Some(empty_plan) = self.setup_context().await? {
685 return Ok(empty_plan);
686 }
687 let normalize = self
688 .selector_to_series_normalize_plan(offset, matchers, false)
689 .await?;
690 let manipulate = InstantManipulate::new(
691 self.ctx.start,
692 self.ctx.end,
693 self.ctx.lookback_delta,
694 self.ctx.interval,
695 self.ctx
696 .time_index_column
697 .clone()
698 .expect("time index should be set in `setup_context`"),
699 self.ctx.field_columns.first().cloned(),
700 normalize,
701 );
702 Ok(LogicalPlan::Extension(Extension {
703 node: Arc::new(manipulate),
704 }))
705 }
706
707 async fn prom_matrix_selector_to_plan(
708 &mut self,
709 matrix_selector: &MatrixSelector,
710 ) -> Result<LogicalPlan> {
711 let MatrixSelector { vs, range } = matrix_selector;
712 let VectorSelector {
713 name,
714 offset,
715 matchers,
716 ..
717 } = vs;
718 let matchers = self.preprocess_label_matchers(matchers, name)?;
719 if let Some(empty_plan) = self.setup_context().await? {
720 return Ok(empty_plan);
721 }
722
723 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
724 let range_ms = range.as_millis() as _;
725 self.ctx.range = Some(range_ms);
726
727 let normalize = self
728 .selector_to_series_normalize_plan(offset, matchers, true)
729 .await?;
730 let manipulate = RangeManipulate::new(
731 self.ctx.start,
732 self.ctx.end,
733 self.ctx.interval,
734 range_ms,
736 self.ctx
737 .time_index_column
738 .clone()
739 .expect("time index should be set in `setup_context`"),
740 self.ctx.field_columns.clone(),
741 normalize,
742 )
743 .context(DataFusionPlanningSnafu)?;
744
745 Ok(LogicalPlan::Extension(Extension {
746 node: Arc::new(manipulate),
747 }))
748 }
749
750 async fn prom_call_expr_to_plan(
751 &mut self,
752 session_state: &SessionState,
753 call_expr: &Call,
754 ) -> Result<LogicalPlan> {
755 let Call { func, args } = call_expr;
756 match func.name {
758 SPECIAL_HISTOGRAM_QUANTILE => {
759 return self.create_histogram_plan(args, session_state).await
760 }
761 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
762 SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
763 _ => {}
764 }
765
766 let args = self.create_function_args(&args.args)?;
768 let input = if let Some(prom_expr) = &args.input {
769 self.prom_expr_to_plan(prom_expr, session_state).await?
770 } else {
771 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
772 self.ctx.reset_table_name_and_schema();
773 self.ctx.tag_columns = vec![];
774 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
775 LogicalPlan::Extension(Extension {
776 node: Arc::new(
777 EmptyMetric::new(
778 self.ctx.start,
779 self.ctx.end,
780 self.ctx.interval,
781 SPECIAL_TIME_FUNCTION.to_string(),
782 DEFAULT_FIELD_COLUMN.to_string(),
783 None,
784 )
785 .context(DataFusionPlanningSnafu)?,
786 ),
787 })
788 };
789 let mut func_exprs =
790 self.create_function_expr(func, args.literals.clone(), session_state)?;
791 func_exprs.insert(0, self.create_time_index_column_expr()?);
792 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
793
794 let builder = LogicalPlanBuilder::from(input)
795 .project(func_exprs)
796 .context(DataFusionPlanningSnafu)?
797 .filter(self.create_empty_values_filter_expr()?)
798 .context(DataFusionPlanningSnafu)?;
799
800 let builder = match func.name {
801 "sort" => builder
802 .sort(self.create_field_columns_sort_exprs(true))
803 .context(DataFusionPlanningSnafu)?,
804 "sort_desc" => builder
805 .sort(self.create_field_columns_sort_exprs(false))
806 .context(DataFusionPlanningSnafu)?,
807 "sort_by_label" => builder
808 .sort(Self::create_sort_exprs_by_tags(
809 func.name,
810 args.literals,
811 true,
812 )?)
813 .context(DataFusionPlanningSnafu)?,
814 "sort_by_label_desc" => builder
815 .sort(Self::create_sort_exprs_by_tags(
816 func.name,
817 args.literals,
818 false,
819 )?)
820 .context(DataFusionPlanningSnafu)?,
821
822 _ => builder,
823 };
824
825 builder.build().context(DataFusionPlanningSnafu)
826 }
827
828 async fn prom_ext_expr_to_plan(
829 &mut self,
830 session_state: &SessionState,
831 ext_expr: &promql_parser::parser::ast::Extension,
832 ) -> Result<LogicalPlan> {
833 let expr = &ext_expr.expr;
835 let children = expr.children();
836 let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
837 match expr.name() {
843 "ANALYZE" => LogicalPlanBuilder::from(plan)
844 .explain(false, true)
845 .unwrap()
846 .build()
847 .context(DataFusionPlanningSnafu),
848 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
849 .explain(true, true)
850 .unwrap()
851 .build()
852 .context(DataFusionPlanningSnafu),
853 "EXPLAIN" => LogicalPlanBuilder::from(plan)
854 .explain(false, false)
855 .unwrap()
856 .build()
857 .context(DataFusionPlanningSnafu),
858 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
859 .explain(true, false)
860 .unwrap()
861 .build()
862 .context(DataFusionPlanningSnafu),
863 _ => LogicalPlanBuilder::empty(true)
864 .build()
865 .context(DataFusionPlanningSnafu),
866 }
867 }
868
869 #[allow(clippy::mutable_key_type)]
879 fn preprocess_label_matchers(
880 &mut self,
881 label_matchers: &Matchers,
882 name: &Option<String>,
883 ) -> Result<Matchers> {
884 self.ctx.reset();
885
886 let metric_name;
887 if let Some(name) = name.clone() {
888 metric_name = Some(name);
889 ensure!(
890 label_matchers.find_matchers(METRIC_NAME).is_empty(),
891 MultipleMetricMatchersSnafu
892 );
893 } else {
894 let mut matches = label_matchers.find_matchers(METRIC_NAME);
895 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
896 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
897 ensure!(
898 matches[0].op == MatchOp::Equal,
899 UnsupportedMatcherOpSnafu {
900 matcher_op: matches[0].op.to_string(),
901 matcher: METRIC_NAME
902 }
903 );
904 metric_name = matches.pop().map(|m| m.value);
905 }
906
907 self.ctx.table_name = metric_name;
908
909 let mut matchers = HashSet::new();
910 for matcher in &label_matchers.matchers {
911 if matcher.name == FIELD_COLUMN_MATCHER {
913 self.ctx
914 .field_column_matcher
915 .get_or_insert_default()
916 .push(matcher.clone());
917 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
918 ensure!(
919 matcher.op == MatchOp::Equal,
920 UnsupportedMatcherOpSnafu {
921 matcher: matcher.name.to_string(),
922 matcher_op: matcher.op.to_string(),
923 }
924 );
925 self.ctx.schema_name = Some(matcher.value.clone());
926 } else if matcher.name != METRIC_NAME {
927 let _ = matchers.insert(matcher.clone());
928 }
929 }
930
931 Ok(Matchers::new(
932 matchers.into_iter().map(normalize_matcher).collect(),
933 ))
934 }
935
936 async fn selector_to_series_normalize_plan(
937 &mut self,
938 offset: &Option<Offset>,
939 label_matchers: Matchers,
940 is_range_selector: bool,
941 ) -> Result<LogicalPlan> {
942 let table_ref = self.table_ref()?;
944 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
945 let table_schema = table_scan.schema();
946
947 let offset_duration = match offset {
949 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
950 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
951 None => 0,
952 };
953 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
954 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
955 scan_filters.push(time_index_filter);
956 }
957 table_scan = LogicalPlanBuilder::from(table_scan)
958 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
960 .build()
961 .context(DataFusionPlanningSnafu)?;
962
963 if let Some(field_matchers) = &self.ctx.field_column_matcher {
965 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
966 let mut result_set = HashSet::new();
968 let mut reverse_set = HashSet::new();
970 for matcher in field_matchers {
971 match &matcher.op {
972 MatchOp::Equal => {
973 if col_set.contains(&matcher.value) {
974 let _ = result_set.insert(matcher.value.clone());
975 } else {
976 return Err(ColumnNotFoundSnafu {
977 col: matcher.value.clone(),
978 }
979 .build());
980 }
981 }
982 MatchOp::NotEqual => {
983 if col_set.contains(&matcher.value) {
984 let _ = reverse_set.insert(matcher.value.clone());
985 } else {
986 return Err(ColumnNotFoundSnafu {
987 col: matcher.value.clone(),
988 }
989 .build());
990 }
991 }
992 MatchOp::Re(regex) => {
993 for col in &self.ctx.field_columns {
994 if regex.is_match(col) {
995 let _ = result_set.insert(col.clone());
996 }
997 }
998 }
999 MatchOp::NotRe(regex) => {
1000 for col in &self.ctx.field_columns {
1001 if regex.is_match(col) {
1002 let _ = reverse_set.insert(col.clone());
1003 }
1004 }
1005 }
1006 }
1007 }
1008 if result_set.is_empty() {
1010 result_set = col_set.into_iter().cloned().collect();
1011 }
1012 for col in reverse_set {
1013 let _ = result_set.remove(&col);
1014 }
1015
1016 self.ctx.field_columns = self
1018 .ctx
1019 .field_columns
1020 .drain(..)
1021 .filter(|col| result_set.contains(col))
1022 .collect();
1023
1024 let exprs = result_set
1025 .into_iter()
1026 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1027 .chain(self.create_tag_column_exprs()?)
1028 .chain(Some(self.create_time_index_column_expr()?))
1029 .collect::<Vec<_>>();
1030
1031 table_scan = LogicalPlanBuilder::from(table_scan)
1033 .project(exprs)
1034 .context(DataFusionPlanningSnafu)?
1035 .build()
1036 .context(DataFusionPlanningSnafu)?;
1037 }
1038
1039 let sort_plan = LogicalPlanBuilder::from(table_scan)
1041 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1042 .context(DataFusionPlanningSnafu)?
1043 .build()
1044 .context(DataFusionPlanningSnafu)?;
1045
1046 let time_index_column =
1048 self.ctx
1049 .time_index_column
1050 .clone()
1051 .with_context(|| TimeIndexNotFoundSnafu {
1052 table: table_ref.to_string(),
1053 })?;
1054 let divide_plan = LogicalPlan::Extension(Extension {
1055 node: Arc::new(SeriesDivide::new(
1056 self.ctx.tag_columns.clone(),
1057 time_index_column,
1058 sort_plan,
1059 )),
1060 });
1061
1062 if !is_range_selector && offset_duration == 0 {
1064 return Ok(divide_plan);
1065 }
1066 let series_normalize = SeriesNormalize::new(
1067 offset_duration,
1068 self.ctx
1069 .time_index_column
1070 .clone()
1071 .with_context(|| TimeIndexNotFoundSnafu {
1072 table: table_ref.to_quoted_string(),
1073 })?,
1074 is_range_selector,
1075 self.ctx.tag_columns.clone(),
1076 divide_plan,
1077 );
1078 let logical_plan = LogicalPlan::Extension(Extension {
1079 node: Arc::new(series_normalize),
1080 });
1081
1082 Ok(logical_plan)
1083 }
1084
1085 fn agg_modifier_to_col(
1092 &mut self,
1093 input_schema: &DFSchemaRef,
1094 modifier: &Option<LabelModifier>,
1095 update_ctx: bool,
1096 ) -> Result<Vec<DfExpr>> {
1097 match modifier {
1098 None => {
1099 if update_ctx {
1100 self.ctx.tag_columns.clear();
1101 }
1102 Ok(vec![self.create_time_index_column_expr()?])
1103 }
1104 Some(LabelModifier::Include(labels)) => {
1105 if update_ctx {
1106 self.ctx.tag_columns.clear();
1107 }
1108 let mut exprs = Vec::with_capacity(labels.labels.len());
1109 for label in &labels.labels {
1110 if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1112 exprs.push(DfExpr::Column(Column::from(field.name())));
1113
1114 if update_ctx {
1115 self.ctx.tag_columns.push(label.clone());
1117 }
1118 }
1119 }
1120 exprs.push(self.create_time_index_column_expr()?);
1122
1123 Ok(exprs)
1124 }
1125 Some(LabelModifier::Exclude(labels)) => {
1126 let mut all_fields = input_schema
1127 .fields()
1128 .iter()
1129 .map(|f| f.name())
1130 .collect::<BTreeSet<_>>();
1131
1132 for label in &labels.labels {
1135 let _ = all_fields.remove(label);
1136 }
1137
1138 if let Some(time_index) = &self.ctx.time_index_column {
1140 let _ = all_fields.remove(time_index);
1141 }
1142 for value in &self.ctx.field_columns {
1143 let _ = all_fields.remove(value);
1144 }
1145
1146 if update_ctx {
1147 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1149 }
1150
1151 let mut exprs = all_fields
1153 .into_iter()
1154 .map(|c| DfExpr::Column(Column::from(c)))
1155 .collect::<Vec<_>>();
1156
1157 exprs.push(self.create_time_index_column_expr()?);
1159
1160 Ok(exprs)
1161 }
1162 }
1163 }
1164
1165 pub fn matchers_to_expr(
1167 label_matchers: Matchers,
1168 table_schema: &DFSchemaRef,
1169 ) -> Result<Vec<DfExpr>> {
1170 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1171 for matcher in label_matchers.matchers {
1172 let col = if table_schema
1173 .field_with_unqualified_name(&matcher.name)
1174 .is_err()
1175 {
1176 DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1177 } else {
1178 DfExpr::Column(Column::from_name(matcher.name))
1179 };
1180 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1181 let expr = match matcher.op {
1182 MatchOp::Equal => col.eq(lit),
1183 MatchOp::NotEqual => col.not_eq(lit),
1184 MatchOp::Re(re) => {
1185 if re.as_str() == ".*" {
1187 continue;
1188 }
1189 DfExpr::BinaryExpr(BinaryExpr {
1190 left: Box::new(col),
1191 op: Operator::RegexMatch,
1192 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1193 re.as_str().to_string(),
1194 )))),
1195 })
1196 }
1197 MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1198 left: Box::new(col),
1199 op: Operator::RegexNotMatch,
1200 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1201 re.as_str().to_string(),
1202 )))),
1203 }),
1204 };
1205 exprs.push(expr);
1206 }
1207
1208 Ok(exprs)
1209 }
1210
1211 fn table_ref(&self) -> Result<TableReference> {
1212 let table_name = self
1213 .ctx
1214 .table_name
1215 .clone()
1216 .context(TableNameNotFoundSnafu)?;
1217
1218 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1220 TableReference::partial(schema_name.as_str(), table_name.as_str())
1221 } else {
1222 TableReference::bare(table_name.as_str())
1223 };
1224
1225 Ok(table_ref)
1226 }
1227
1228 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1229 let start = self.ctx.start;
1230 let end = self.ctx.end;
1231 if end < start {
1232 return InvalidTimeRangeSnafu { start, end }.fail();
1233 }
1234 let lookback_delta = self.ctx.lookback_delta;
1235 let range = self.ctx.range.unwrap_or_default();
1236 let interval = self.ctx.interval;
1237 let time_index_expr = self.create_time_index_column_expr()?;
1238 let num_points = (end - start) / interval;
1239
1240 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1242 let single_time_range = time_index_expr
1243 .clone()
1244 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1245 Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1246 None,
1247 )))
1248 .and(
1249 time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1250 Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1251 None,
1252 ))),
1253 );
1254 return Ok(Some(single_time_range));
1255 }
1256
1257 let mut filters = Vec::with_capacity(num_points as usize);
1259 for timestamp in (start..end).step_by(interval as usize) {
1260 filters.push(
1261 time_index_expr
1262 .clone()
1263 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1264 Some(timestamp - offset_duration - lookback_delta - range),
1265 None,
1266 )))
1267 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1268 ScalarValue::TimestampMillisecond(
1269 Some(timestamp - offset_duration + lookback_delta),
1270 None,
1271 ),
1272 ))),
1273 )
1274 }
1275
1276 Ok(filters.into_iter().reduce(DfExpr::or))
1277 }
1278
1279 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1284 let provider = self
1285 .table_provider
1286 .resolve_table(table_ref.clone())
1287 .await
1288 .context(CatalogSnafu)?;
1289
1290 let is_time_index_ms = provider
1291 .as_any()
1292 .downcast_ref::<DefaultTableSource>()
1293 .context(UnknownTableSnafu)?
1294 .table_provider
1295 .as_any()
1296 .downcast_ref::<DfTableProviderAdapter>()
1297 .context(UnknownTableSnafu)?
1298 .table()
1299 .schema()
1300 .timestamp_column()
1301 .with_context(|| TimeIndexNotFoundSnafu {
1302 table: table_ref.to_quoted_string(),
1303 })?
1304 .data_type
1305 == ConcreteDataType::timestamp_millisecond_datatype();
1306
1307 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1308 .context(DataFusionPlanningSnafu)?
1309 .build()
1310 .context(DataFusionPlanningSnafu)?;
1311
1312 if !is_time_index_ms {
1313 let expr: Vec<_> = self
1315 .ctx
1316 .field_columns
1317 .iter()
1318 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1319 .chain(self.create_tag_column_exprs()?)
1320 .chain(Some(DfExpr::Alias(Alias {
1321 expr: Box::new(DfExpr::Cast(Cast {
1322 expr: Box::new(self.create_time_index_column_expr()?),
1323 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1324 })),
1325 relation: Some(table_ref.clone()),
1326 name: self
1327 .ctx
1328 .time_index_column
1329 .as_ref()
1330 .with_context(|| TimeIndexNotFoundSnafu {
1331 table: table_ref.to_quoted_string(),
1332 })?
1333 .clone(),
1334 })))
1335 .collect::<Vec<_>>();
1336 scan_plan = LogicalPlanBuilder::from(scan_plan)
1337 .project(expr)
1338 .context(DataFusionPlanningSnafu)?
1339 .build()
1340 .context(DataFusionPlanningSnafu)?;
1341 }
1342
1343 let result = LogicalPlanBuilder::from(scan_plan)
1344 .build()
1345 .context(DataFusionPlanningSnafu)?;
1346 Ok(result)
1347 }
1348
1349 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1353 let table_ref = self.table_ref()?;
1354 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1355 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1356 let plan = self.setup_context_for_empty_metric()?;
1357 return Ok(Some(plan));
1358 }
1359 res => res.context(CatalogSnafu)?,
1360 };
1361 let table = table
1362 .as_any()
1363 .downcast_ref::<DefaultTableSource>()
1364 .context(UnknownTableSnafu)?
1365 .table_provider
1366 .as_any()
1367 .downcast_ref::<DfTableProviderAdapter>()
1368 .context(UnknownTableSnafu)?
1369 .table();
1370
1371 let time_index = table
1373 .schema()
1374 .timestamp_column()
1375 .with_context(|| TimeIndexNotFoundSnafu {
1376 table: table_ref.to_quoted_string(),
1377 })?
1378 .name
1379 .clone();
1380 self.ctx.time_index_column = Some(time_index);
1381
1382 let values = table
1384 .table_info()
1385 .meta
1386 .field_column_names()
1387 .cloned()
1388 .collect();
1389 self.ctx.field_columns = values;
1390
1391 let tags = table
1393 .table_info()
1394 .meta
1395 .row_key_column_names()
1396 .filter(|col| {
1397 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1399 })
1400 .cloned()
1401 .collect();
1402 self.ctx.tag_columns = tags;
1403
1404 Ok(None)
1405 }
1406
1407 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1410 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1411 self.ctx.reset_table_name_and_schema();
1412 self.ctx.tag_columns = vec![];
1413 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1414
1415 let plan = LogicalPlan::Extension(Extension {
1417 node: Arc::new(
1418 EmptyMetric::new(
1419 0,
1420 -1,
1421 self.ctx.interval,
1422 SPECIAL_TIME_FUNCTION.to_string(),
1423 DEFAULT_FIELD_COLUMN.to_string(),
1424 Some(DfExpr::Literal(ScalarValue::Float64(Some(0.0)))),
1425 )
1426 .context(DataFusionPlanningSnafu)?,
1427 ),
1428 });
1429 Ok(plan)
1430 }
1431
1432 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1434 let mut result = FunctionArgs::default();
1435
1436 for arg in args {
1437 match *arg.clone() {
1438 PromExpr::Subquery(_)
1439 | PromExpr::VectorSelector(_)
1440 | PromExpr::MatrixSelector(_)
1441 | PromExpr::Extension(_)
1442 | PromExpr::Aggregate(_)
1443 | PromExpr::Paren(_)
1444 | PromExpr::Call(_) => {
1445 if result.input.replace(*arg.clone()).is_some() {
1446 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1447 }
1448 }
1449
1450 _ => {
1451 let expr =
1452 Self::get_param_as_literal_expr(&Some(Box::new(*arg.clone())), None, None)?;
1453 result.literals.push(expr);
1454 }
1455 }
1456 }
1457
1458 Ok(result)
1459 }
1460
1461 fn create_function_expr(
1465 &mut self,
1466 func: &Function,
1467 other_input_exprs: Vec<DfExpr>,
1468 session_state: &SessionState,
1469 ) -> Result<Vec<DfExpr>> {
1470 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1472
1473 let field_column_pos = 0;
1475 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1476 let scalar_func = match func.name {
1477 "increase" => ScalarFunc::ExtrapolateUdf(
1478 Arc::new(Increase::scalar_udf()),
1479 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1480 ),
1481 "rate" => ScalarFunc::ExtrapolateUdf(
1482 Arc::new(Rate::scalar_udf()),
1483 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1484 ),
1485 "delta" => ScalarFunc::ExtrapolateUdf(
1486 Arc::new(Delta::scalar_udf()),
1487 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1488 ),
1489 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1490 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1491 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1492 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1493 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1494 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1495 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1496 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1497 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1498 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1499 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1500 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1501 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1502 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1503 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1504 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1505 "predict_linear" => {
1506 other_input_exprs[0] = DfExpr::Cast(Cast {
1507 expr: Box::new(other_input_exprs[0].clone()),
1508 data_type: ArrowDataType::Int64,
1509 });
1510 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1511 }
1512 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1513 "time" => {
1514 exprs.push(build_special_time_expr(
1515 self.ctx.time_index_column.as_ref().unwrap(),
1516 ));
1517 ScalarFunc::GeneratedExpr
1518 }
1519 "minute" => {
1520 let expr = self.date_part_on_time_index("minute")?;
1522 exprs.push(expr);
1523 ScalarFunc::GeneratedExpr
1524 }
1525 "hour" => {
1526 let expr = self.date_part_on_time_index("hour")?;
1528 exprs.push(expr);
1529 ScalarFunc::GeneratedExpr
1530 }
1531 "month" => {
1532 let expr = self.date_part_on_time_index("month")?;
1534 exprs.push(expr);
1535 ScalarFunc::GeneratedExpr
1536 }
1537 "year" => {
1538 let expr = self.date_part_on_time_index("year")?;
1540 exprs.push(expr);
1541 ScalarFunc::GeneratedExpr
1542 }
1543 "day_of_month" => {
1544 let expr = self.date_part_on_time_index("day")?;
1546 exprs.push(expr);
1547 ScalarFunc::GeneratedExpr
1548 }
1549 "day_of_week" => {
1550 let expr = self.date_part_on_time_index("dow")?;
1552 exprs.push(expr);
1553 ScalarFunc::GeneratedExpr
1554 }
1555 "day_of_year" => {
1556 let expr = self.date_part_on_time_index("doy")?;
1558 exprs.push(expr);
1559 ScalarFunc::GeneratedExpr
1560 }
1561 "days_in_month" => {
1562 let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1567 let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1568 let interval_1month_lit_expr =
1569 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1570 let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1571 IntervalDayTime::new(1, 0),
1572 )));
1573 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1574 left: Box::new(interval_1month_lit_expr),
1575 op: Operator::Minus,
1576 right: Box::new(interval_1day_lit_expr),
1577 });
1578 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1579 func: datafusion_functions::datetime::date_trunc(),
1580 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1581 });
1582 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1583 left: Box::new(date_trunc_expr),
1584 op: Operator::Plus,
1585 right: Box::new(the_1month_minus_1day_expr),
1586 });
1587 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1588 func: datafusion_functions::datetime::date_part(),
1589 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1590 });
1591
1592 exprs.push(date_part_expr);
1593 ScalarFunc::GeneratedExpr
1594 }
1595
1596 "label_join" => {
1597 let (concat_expr, dst_label) =
1598 Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1599
1600 for value in &self.ctx.field_columns {
1602 if *value != dst_label {
1603 let expr = DfExpr::Column(Column::from_name(value));
1604 exprs.push(expr);
1605 }
1606 }
1607
1608 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1610
1611 exprs.push(concat_expr);
1613
1614 ScalarFunc::GeneratedExpr
1615 }
1616 "label_replace" => {
1617 let (replace_expr, dst_label) =
1618 Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
1619
1620 for value in &self.ctx.field_columns {
1622 if *value != dst_label {
1623 let expr = DfExpr::Column(Column::from_name(value));
1624 exprs.push(expr);
1625 }
1626 }
1627
1628 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1630
1631 exprs.push(replace_expr);
1633
1634 ScalarFunc::GeneratedExpr
1635 }
1636 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1637 for value in &self.ctx.field_columns {
1640 let expr = DfExpr::Column(Column::from_name(value));
1641 exprs.push(expr);
1642 }
1643
1644 ScalarFunc::GeneratedExpr
1645 }
1646 "round" => {
1647 if other_input_exprs.is_empty() {
1648 other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1649 }
1650 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1651 }
1652
1653 _ => {
1654 if let Some(f) = session_state.scalar_functions().get(func.name) {
1655 ScalarFunc::DataFusionBuiltin(f.clone())
1656 } else if let Some(f) = datafusion_functions::math::functions()
1657 .iter()
1658 .find(|f| f.name() == func.name)
1659 {
1660 ScalarFunc::DataFusionUdf(f.clone())
1661 } else {
1662 return UnsupportedExprSnafu {
1663 name: func.name.to_string(),
1664 }
1665 .fail();
1666 }
1667 }
1668 };
1669
1670 for value in &self.ctx.field_columns {
1671 let col_expr = DfExpr::Column(Column::from_name(value));
1672
1673 match scalar_func.clone() {
1674 ScalarFunc::DataFusionBuiltin(func) => {
1675 other_input_exprs.insert(field_column_pos, col_expr);
1676 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1677 func,
1678 args: other_input_exprs.clone().into(),
1679 });
1680 exprs.push(fn_expr);
1681 let _ = other_input_exprs.remove(field_column_pos);
1682 }
1683 ScalarFunc::DataFusionUdf(func) => {
1684 let args = itertools::chain!(
1685 other_input_exprs.iter().take(field_column_pos).cloned(),
1686 std::iter::once(col_expr),
1687 other_input_exprs.iter().skip(field_column_pos).cloned()
1688 )
1689 .collect_vec();
1690 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1691 }
1692 ScalarFunc::Udf(func) => {
1693 let ts_range_expr = DfExpr::Column(Column::from_name(
1694 RangeManipulate::build_timestamp_range_name(
1695 self.ctx.time_index_column.as_ref().unwrap(),
1696 ),
1697 ));
1698 other_input_exprs.insert(field_column_pos, ts_range_expr);
1699 other_input_exprs.insert(field_column_pos + 1, col_expr);
1700 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1701 func,
1702 args: other_input_exprs.clone().into(),
1703 });
1704 exprs.push(fn_expr);
1705 let _ = other_input_exprs.remove(field_column_pos + 1);
1706 let _ = other_input_exprs.remove(field_column_pos);
1707 }
1708 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1709 let ts_range_expr = DfExpr::Column(Column::from_name(
1710 RangeManipulate::build_timestamp_range_name(
1711 self.ctx.time_index_column.as_ref().unwrap(),
1712 ),
1713 ));
1714 other_input_exprs.insert(field_column_pos, ts_range_expr);
1715 other_input_exprs.insert(field_column_pos + 1, col_expr);
1716 other_input_exprs
1717 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1718 other_input_exprs.push_back(lit(range_length));
1719 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1720 func,
1721 args: other_input_exprs.clone().into(),
1722 });
1723 exprs.push(fn_expr);
1724 let _ = other_input_exprs.pop_back();
1725 let _ = other_input_exprs.remove(field_column_pos + 2);
1726 let _ = other_input_exprs.remove(field_column_pos + 1);
1727 let _ = other_input_exprs.remove(field_column_pos);
1728 }
1729 ScalarFunc::GeneratedExpr => {}
1730 }
1731 }
1732
1733 let mut new_field_columns = Vec::with_capacity(exprs.len());
1735
1736 exprs = exprs
1737 .into_iter()
1738 .map(|expr| {
1739 let display_name = expr.schema_name().to_string();
1740 new_field_columns.push(display_name.clone());
1741 Ok(expr.alias(display_name))
1742 })
1743 .collect::<std::result::Result<Vec<_>, _>>()
1744 .context(DataFusionPlanningSnafu)?;
1745
1746 self.ctx.field_columns = new_field_columns;
1747
1748 Ok(exprs)
1749 }
1750
1751 fn build_regexp_replace_label_expr(
1753 other_input_exprs: &mut VecDeque<DfExpr>,
1754 session_state: &SessionState,
1755 ) -> Result<(DfExpr, String)> {
1756 let dst_label = match other_input_exprs.pop_front() {
1758 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1759 other => UnexpectedPlanExprSnafu {
1760 desc: format!("expected dst_label string literal, but found {:?}", other),
1761 }
1762 .fail()?,
1763 };
1764 let replacement = match other_input_exprs.pop_front() {
1765 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1766 other => UnexpectedPlanExprSnafu {
1767 desc: format!("expected replacement string literal, but found {:?}", other),
1768 }
1769 .fail()?,
1770 };
1771 let src_label = match other_input_exprs.pop_front() {
1772 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1773 other => UnexpectedPlanExprSnafu {
1774 desc: format!("expected src_label string literal, but found {:?}", other),
1775 }
1776 .fail()?,
1777 };
1778 let regex = match other_input_exprs.pop_front() {
1779 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1780 other => UnexpectedPlanExprSnafu {
1781 desc: format!("expected regex string literal, but found {:?}", other),
1782 }
1783 .fail()?,
1784 };
1785
1786 let func = session_state
1787 .scalar_functions()
1788 .get("regexp_replace")
1789 .context(UnsupportedExprSnafu {
1790 name: "regexp_replace",
1791 })?;
1792
1793 let args = vec![
1795 if src_label.is_empty() {
1796 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1797 } else {
1798 DfExpr::Column(Column::from_name(src_label))
1799 },
1800 DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1801 DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1802 ];
1803
1804 Ok((
1805 DfExpr::ScalarFunction(ScalarFunction {
1806 func: func.clone(),
1807 args,
1808 })
1809 .alias(&dst_label),
1810 dst_label,
1811 ))
1812 }
1813
1814 fn build_concat_labels_expr(
1816 other_input_exprs: &mut VecDeque<DfExpr>,
1817 session_state: &SessionState,
1818 ) -> Result<(DfExpr, String)> {
1819 let dst_label = match other_input_exprs.pop_front() {
1822 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1823 other => UnexpectedPlanExprSnafu {
1824 desc: format!("expected dst_label string literal, but found {:?}", other),
1825 }
1826 .fail()?,
1827 };
1828 let separator = match other_input_exprs.pop_front() {
1829 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1830 other => UnexpectedPlanExprSnafu {
1831 desc: format!("expected separator string literal, but found {:?}", other),
1832 }
1833 .fail()?,
1834 };
1835 let src_labels = other_input_exprs
1836 .clone()
1837 .into_iter()
1838 .map(|expr| {
1839 match expr {
1841 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1842 if label.is_empty() {
1843 Ok(DfExpr::Literal(ScalarValue::Null))
1844 } else {
1845 Ok(DfExpr::Column(Column::from_name(label)))
1846 }
1847 }
1848 other => UnexpectedPlanExprSnafu {
1849 desc: format!(
1850 "expected source label string literal, but found {:?}",
1851 other
1852 ),
1853 }
1854 .fail(),
1855 }
1856 })
1857 .collect::<Result<Vec<_>>>()?;
1858 ensure!(
1859 !src_labels.is_empty(),
1860 FunctionInvalidArgumentSnafu {
1861 fn_name: "label_join"
1862 }
1863 );
1864
1865 let func = session_state
1866 .scalar_functions()
1867 .get("concat_ws")
1868 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1869
1870 let mut args = Vec::with_capacity(1 + src_labels.len());
1872 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1873 args.extend(src_labels);
1874
1875 Ok((
1876 DfExpr::ScalarFunction(ScalarFunction {
1877 func: func.clone(),
1878 args,
1879 })
1880 .alias(&dst_label),
1881 dst_label,
1882 ))
1883 }
1884
1885 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1886 Ok(DfExpr::Column(Column::from_name(
1887 self.ctx
1888 .time_index_column
1889 .clone()
1890 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1891 )))
1892 }
1893
1894 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1895 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1896 for tag in &self.ctx.tag_columns {
1897 let expr = DfExpr::Column(Column::from_name(tag));
1898 result.push(expr);
1899 }
1900 Ok(result)
1901 }
1902
1903 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1904 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1905 for field in &self.ctx.field_columns {
1906 let expr = DfExpr::Column(Column::from_name(field));
1907 result.push(expr);
1908 }
1909 Ok(result)
1910 }
1911
1912 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1913 let mut result = self
1914 .ctx
1915 .tag_columns
1916 .iter()
1917 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1918 .collect::<Vec<_>>();
1919 result.push(self.create_time_index_column_expr()?.sort(true, true));
1920 Ok(result)
1921 }
1922
1923 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1924 self.ctx
1925 .field_columns
1926 .iter()
1927 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1928 .collect::<Vec<_>>()
1929 }
1930
1931 fn create_sort_exprs_by_tags(
1932 func: &str,
1933 tags: Vec<DfExpr>,
1934 asc: bool,
1935 ) -> Result<Vec<SortExpr>> {
1936 ensure!(
1937 !tags.is_empty(),
1938 FunctionInvalidArgumentSnafu { fn_name: func }
1939 );
1940
1941 tags.iter()
1942 .map(|col| match col {
1943 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1944 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1945 }
1946 other => UnexpectedPlanExprSnafu {
1947 desc: format!("expected label string literal, but found {:?}", other),
1948 }
1949 .fail(),
1950 })
1951 .collect::<Result<Vec<_>>>()
1952 }
1953
1954 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
1955 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1956 for value in &self.ctx.field_columns {
1957 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
1958 exprs.push(expr);
1959 }
1960
1961 conjunction(exprs).context(ValueNotFoundSnafu {
1962 table: self.table_ref()?.to_quoted_string(),
1963 })
1964 }
1965
1966 fn create_aggregate_exprs(
1982 &mut self,
1983 op: TokenType,
1984 param: &Option<Box<PromExpr>>,
1985 input_plan: &LogicalPlan,
1986 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
1987 let mut non_col_args = Vec::new();
1988 let aggr = match op.id() {
1989 token::T_SUM => sum_udaf(),
1990 token::T_QUANTILE => {
1991 let q =
1992 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
1993 non_col_args.push(q);
1994 quantile_udaf()
1995 }
1996 token::T_AVG => avg_udaf(),
1997 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
1998 token::T_MIN => min_udaf(),
1999 token::T_MAX => max_udaf(),
2000 token::T_GROUP => grouping_udaf(),
2001 token::T_STDDEV => stddev_pop_udaf(),
2002 token::T_STDVAR => var_pop_udaf(),
2003 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2004 name: format!("{op:?}"),
2005 }
2006 .fail()?,
2007 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2008 };
2009
2010 let exprs: Vec<DfExpr> = self
2012 .ctx
2013 .field_columns
2014 .iter()
2015 .map(|col| {
2016 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2017 let expr = aggr.call(non_col_args.clone());
2018 non_col_args.pop();
2019 expr
2020 })
2021 .collect::<Vec<_>>();
2022
2023 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2025 let prev_field_exprs: Vec<_> = self
2026 .ctx
2027 .field_columns
2028 .iter()
2029 .map(|col| DfExpr::Column(Column::from_name(col)))
2030 .collect();
2031
2032 ensure!(
2033 self.ctx.field_columns.len() == 1,
2034 UnsupportedExprSnafu {
2035 name: "count_values on multi-value input"
2036 }
2037 );
2038
2039 prev_field_exprs
2040 } else {
2041 vec![]
2042 };
2043
2044 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2046
2047 let normalized_exprs =
2048 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2049 for expr in normalized_exprs {
2050 new_field_columns.push(expr.schema_name().to_string());
2051 }
2052 self.ctx.field_columns = new_field_columns;
2053
2054 Ok((exprs, prev_field_exprs))
2055 }
2056
2057 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2058 let param = param
2059 .as_deref()
2060 .with_context(|| FunctionInvalidArgumentSnafu {
2061 fn_name: op.to_string(),
2062 })?;
2063 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2064 return FunctionInvalidArgumentSnafu {
2065 fn_name: op.to_string(),
2066 }
2067 .fail();
2068 };
2069
2070 Ok(val)
2071 }
2072
2073 fn get_param_as_literal_expr(
2074 param: &Option<Box<PromExpr>>,
2075 op: Option<TokenType>,
2076 expected_type: Option<ArrowDataType>,
2077 ) -> Result<DfExpr> {
2078 let prom_param = param.as_deref().with_context(|| {
2079 if let Some(op) = op {
2080 FunctionInvalidArgumentSnafu {
2081 fn_name: op.to_string(),
2082 }
2083 } else {
2084 FunctionInvalidArgumentSnafu {
2085 fn_name: "unknown".to_string(),
2086 }
2087 }
2088 })?;
2089
2090 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2091 if let Some(op) = op {
2092 FunctionInvalidArgumentSnafu {
2093 fn_name: op.to_string(),
2094 }
2095 } else {
2096 FunctionInvalidArgumentSnafu {
2097 fn_name: "unknown".to_string(),
2098 }
2099 }
2100 })?;
2101
2102 if let Some(expected_type) = expected_type {
2104 let expr_type = expr
2106 .get_type(&DFSchema::empty())
2107 .context(DataFusionPlanningSnafu)?;
2108 if expected_type != expr_type {
2109 return FunctionInvalidArgumentSnafu {
2110 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2111 }
2112 .fail();
2113 }
2114 }
2115
2116 Ok(expr)
2117 }
2118
2119 fn create_window_exprs(
2122 &mut self,
2123 op: TokenType,
2124 group_exprs: Vec<DfExpr>,
2125 input_plan: &LogicalPlan,
2126 ) -> Result<Vec<DfExpr>> {
2127 ensure!(
2128 self.ctx.field_columns.len() == 1,
2129 UnsupportedExprSnafu {
2130 name: "topk or bottomk on multi-value input"
2131 }
2132 );
2133
2134 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2135
2136 let asc = matches!(op.id(), token::T_BOTTOMK);
2137
2138 let tag_sort_exprs = self
2139 .create_tag_column_exprs()?
2140 .into_iter()
2141 .map(|expr| expr.sort(asc, true));
2142
2143 let exprs: Vec<DfExpr> = self
2145 .ctx
2146 .field_columns
2147 .iter()
2148 .map(|col| {
2149 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2150 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2152 sort_exprs.extend(tag_sort_exprs.clone());
2155
2156 DfExpr::WindowFunction(WindowFunction {
2157 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2158 args: vec![],
2159 partition_by: group_exprs.clone(),
2160 order_by: sort_exprs,
2161 window_frame: WindowFrame::new(Some(true)),
2162 null_treatment: None,
2163 })
2164 })
2165 .collect();
2166
2167 let normalized_exprs =
2168 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2169 Ok(normalized_exprs)
2170 }
2171
2172 #[deprecated(
2174 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2175 )]
2176 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2177 match expr {
2178 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2179 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2180 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2181 Self::try_build_float_literal(expr).map(|f| -f)
2182 }
2183 PromExpr::StringLiteral(_)
2184 | PromExpr::Binary(_)
2185 | PromExpr::VectorSelector(_)
2186 | PromExpr::MatrixSelector(_)
2187 | PromExpr::Call(_)
2188 | PromExpr::Extension(_)
2189 | PromExpr::Aggregate(_)
2190 | PromExpr::Subquery(_) => None,
2191 }
2192 }
2193
2194 async fn create_histogram_plan(
2196 &mut self,
2197 args: &PromFunctionArgs,
2198 session_state: &SessionState,
2199 ) -> Result<LogicalPlan> {
2200 if args.args.len() != 2 {
2201 return FunctionInvalidArgumentSnafu {
2202 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2203 }
2204 .fail();
2205 }
2206 #[allow(deprecated)]
2207 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2208 FunctionInvalidArgumentSnafu {
2209 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2210 }
2211 })?;
2212
2213 let input = args.args[1].as_ref().clone();
2214 let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2215
2216 if !self.ctx.has_le_tag() {
2217 return ColumnNotFoundSnafu {
2218 col: LE_COLUMN_NAME.to_string(),
2219 }
2220 .fail();
2221 }
2222 let time_index_column =
2223 self.ctx
2224 .time_index_column
2225 .clone()
2226 .with_context(|| TimeIndexNotFoundSnafu {
2227 table: self.ctx.table_name.clone().unwrap_or_default(),
2228 })?;
2229 let field_column = self
2231 .ctx
2232 .field_columns
2233 .first()
2234 .with_context(|| FunctionInvalidArgumentSnafu {
2235 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2236 })?
2237 .clone();
2238 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2240
2241 Ok(LogicalPlan::Extension(Extension {
2242 node: Arc::new(
2243 HistogramFold::new(
2244 LE_COLUMN_NAME.to_string(),
2245 field_column,
2246 time_index_column,
2247 phi,
2248 input_plan,
2249 )
2250 .context(DataFusionPlanningSnafu)?,
2251 ),
2252 }))
2253 }
2254
2255 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2257 if args.args.len() != 1 {
2258 return FunctionInvalidArgumentSnafu {
2259 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2260 }
2261 .fail();
2262 }
2263 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2264
2265 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2267 self.ctx.reset_table_name_and_schema();
2268 self.ctx.tag_columns = vec![];
2269 self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2270 Ok(LogicalPlan::Extension(Extension {
2271 node: Arc::new(
2272 EmptyMetric::new(
2273 self.ctx.start,
2274 self.ctx.end,
2275 self.ctx.interval,
2276 SPECIAL_TIME_FUNCTION.to_string(),
2277 GREPTIME_VALUE.to_string(),
2278 Some(lit),
2279 )
2280 .context(DataFusionPlanningSnafu)?,
2281 ),
2282 }))
2283 }
2284
2285 async fn create_scalar_plan(
2287 &mut self,
2288 args: &PromFunctionArgs,
2289 session_state: &SessionState,
2290 ) -> Result<LogicalPlan> {
2291 ensure!(
2292 args.len() == 1,
2293 FunctionInvalidArgumentSnafu {
2294 fn_name: SCALAR_FUNCTION
2295 }
2296 );
2297 let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2298 ensure!(
2299 self.ctx.field_columns.len() == 1,
2300 MultiFieldsNotSupportedSnafu {
2301 operator: SCALAR_FUNCTION
2302 },
2303 );
2304 let scalar_plan = LogicalPlan::Extension(Extension {
2305 node: Arc::new(
2306 ScalarCalculate::new(
2307 self.ctx.start,
2308 self.ctx.end,
2309 self.ctx.interval,
2310 input,
2311 self.ctx.time_index_column.as_ref().unwrap(),
2312 &self.ctx.tag_columns,
2313 &self.ctx.field_columns[0],
2314 self.ctx.table_name.as_deref(),
2315 )
2316 .context(PromqlPlanNodeSnafu)?,
2317 ),
2318 });
2319 self.ctx.tag_columns.clear();
2321 self.ctx.field_columns.clear();
2322 self.ctx
2323 .field_columns
2324 .push(scalar_plan.schema().field(1).name().clone());
2325 Ok(scalar_plan)
2326 }
2327
2328 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2331 match expr {
2332 PromExpr::NumberLiteral(NumberLiteral { val }) => {
2333 let scalar_value = ScalarValue::Float64(Some(*val));
2334 Some(DfExpr::Literal(scalar_value))
2335 }
2336 PromExpr::StringLiteral(StringLiteral { val }) => {
2337 let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2338 Some(DfExpr::Literal(scalar_value))
2339 }
2340 PromExpr::VectorSelector(_)
2341 | PromExpr::MatrixSelector(_)
2342 | PromExpr::Extension(_)
2343 | PromExpr::Aggregate(_)
2344 | PromExpr::Subquery(_) => None,
2345 PromExpr::Call(Call { func, .. }) => {
2346 if func.name == SPECIAL_TIME_FUNCTION {
2347 Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2348 } else {
2349 None
2350 }
2351 }
2352 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2353 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2355 PromExpr::Binary(PromBinaryExpr {
2356 lhs,
2357 rhs,
2358 op,
2359 modifier,
2360 }) => {
2361 let lhs = Self::try_build_literal_expr(lhs)?;
2362 let rhs = Self::try_build_literal_expr(rhs)?;
2363 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2364 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2365 let expr = expr_builder(lhs, rhs).ok()?;
2366
2367 let should_return_bool = if let Some(m) = modifier {
2368 m.return_bool
2369 } else {
2370 false
2371 };
2372 if is_comparison_op && should_return_bool {
2373 Some(DfExpr::Cast(Cast {
2374 expr: Box::new(expr),
2375 data_type: ArrowDataType::Float64,
2376 }))
2377 } else {
2378 Some(expr)
2379 }
2380 }
2381 }
2382 }
2383
2384 fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2385 match expr {
2386 PromExpr::Call(Call { func, .. }) => {
2387 if func.name == SPECIAL_TIME_FUNCTION {
2388 Some(build_special_time_expr(time_index_col))
2389 } else {
2390 None
2391 }
2392 }
2393 _ => None,
2394 }
2395 }
2396
2397 #[allow(clippy::type_complexity)]
2400 fn prom_token_to_binary_expr_builder(
2401 token: TokenType,
2402 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2403 match token.id() {
2404 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2405 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2406 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2407 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2408 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2409 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2410 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2411 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2412 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2413 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2414 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2415 token::T_POW => Ok(Box::new(|lhs, rhs| {
2416 Ok(DfExpr::ScalarFunction(ScalarFunction {
2417 func: datafusion_functions::math::power(),
2418 args: vec![lhs, rhs],
2419 }))
2420 })),
2421 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2422 Ok(DfExpr::ScalarFunction(ScalarFunction {
2423 func: datafusion_functions::math::atan2(),
2424 args: vec![lhs, rhs],
2425 }))
2426 })),
2427 _ => UnexpectedTokenSnafu { token }.fail(),
2428 }
2429 }
2430
2431 fn is_token_a_comparison_op(token: TokenType) -> bool {
2433 matches!(
2434 token.id(),
2435 token::T_EQLC
2436 | token::T_NEQ
2437 | token::T_GTR
2438 | token::T_LSS
2439 | token::T_GTE
2440 | token::T_LTE
2441 )
2442 }
2443
2444 fn is_token_a_set_op(token: TokenType) -> bool {
2446 matches!(
2447 token.id(),
2448 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2452 }
2453
2454 #[allow(clippy::too_many_arguments)]
2457 fn join_on_non_field_columns(
2458 &self,
2459 left: LogicalPlan,
2460 right: LogicalPlan,
2461 left_table_ref: TableReference,
2462 right_table_ref: TableReference,
2463 left_time_index_column: Option<String>,
2464 right_time_index_column: Option<String>,
2465 only_join_time_index: bool,
2466 modifier: &Option<BinModifier>,
2467 ) -> Result<LogicalPlan> {
2468 let mut left_tag_columns = if only_join_time_index {
2469 BTreeSet::new()
2470 } else {
2471 self.ctx
2472 .tag_columns
2473 .iter()
2474 .cloned()
2475 .collect::<BTreeSet<_>>()
2476 };
2477 let mut right_tag_columns = left_tag_columns.clone();
2478
2479 if let Some(modifier) = modifier {
2481 if let Some(matching) = &modifier.matching {
2483 match matching {
2484 LabelModifier::Include(on) => {
2486 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2487 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2488 right_tag_columns =
2489 right_tag_columns.intersection(&mask).cloned().collect();
2490 }
2491 LabelModifier::Exclude(ignoring) => {
2493 for label in &ignoring.labels {
2495 let _ = left_tag_columns.remove(label);
2496 let _ = right_tag_columns.remove(label);
2497 }
2498 }
2499 }
2500 }
2501 }
2502
2503 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2505 (left_time_index_column, right_time_index_column)
2506 {
2507 left_tag_columns.insert(left_time_index_column);
2508 right_tag_columns.insert(right_time_index_column);
2509 }
2510
2511 let right = LogicalPlanBuilder::from(right)
2512 .alias(right_table_ref)
2513 .context(DataFusionPlanningSnafu)?
2514 .build()
2515 .context(DataFusionPlanningSnafu)?;
2516
2517 LogicalPlanBuilder::from(left)
2519 .alias(left_table_ref)
2520 .context(DataFusionPlanningSnafu)?
2521 .join_detailed(
2522 right,
2523 JoinType::Inner,
2524 (
2525 left_tag_columns
2526 .into_iter()
2527 .map(Column::from_name)
2528 .collect::<Vec<_>>(),
2529 right_tag_columns
2530 .into_iter()
2531 .map(Column::from_name)
2532 .collect::<Vec<_>>(),
2533 ),
2534 None,
2535 true,
2536 )
2537 .context(DataFusionPlanningSnafu)?
2538 .build()
2539 .context(DataFusionPlanningSnafu)
2540 }
2541
2542 fn set_op_on_non_field_columns(
2544 &mut self,
2545 left: LogicalPlan,
2546 mut right: LogicalPlan,
2547 left_context: PromPlannerContext,
2548 right_context: PromPlannerContext,
2549 op: TokenType,
2550 modifier: &Option<BinModifier>,
2551 ) -> Result<LogicalPlan> {
2552 let mut left_tag_col_set = left_context
2553 .tag_columns
2554 .iter()
2555 .cloned()
2556 .collect::<HashSet<_>>();
2557 let mut right_tag_col_set = right_context
2558 .tag_columns
2559 .iter()
2560 .cloned()
2561 .collect::<HashSet<_>>();
2562
2563 if matches!(op.id(), token::T_LOR) {
2564 return self.or_operator(
2565 left,
2566 right,
2567 left_tag_col_set,
2568 right_tag_col_set,
2569 left_context,
2570 right_context,
2571 modifier,
2572 );
2573 }
2574
2575 if let Some(modifier) = modifier {
2577 ensure!(
2579 matches!(
2580 modifier.card,
2581 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2582 ),
2583 UnsupportedVectorMatchSnafu {
2584 name: modifier.card.clone(),
2585 },
2586 );
2587 if let Some(matching) = &modifier.matching {
2589 match matching {
2590 LabelModifier::Include(on) => {
2592 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2593 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2594 right_tag_col_set =
2595 right_tag_col_set.intersection(&mask).cloned().collect();
2596 }
2597 LabelModifier::Exclude(ignoring) => {
2599 for label in &ignoring.labels {
2601 let _ = left_tag_col_set.remove(label);
2602 let _ = right_tag_col_set.remove(label);
2603 }
2604 }
2605 }
2606 }
2607 }
2608 if !matches!(op.id(), token::T_LOR) {
2610 ensure!(
2611 left_tag_col_set == right_tag_col_set,
2612 CombineTableColumnMismatchSnafu {
2613 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2614 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2615 }
2616 )
2617 };
2618 let left_time_index = left_context.time_index_column.clone().unwrap();
2619 let right_time_index = right_context.time_index_column.clone().unwrap();
2620 let join_keys = left_tag_col_set
2621 .iter()
2622 .cloned()
2623 .chain([left_time_index.clone()])
2624 .collect::<Vec<_>>();
2625 self.ctx.time_index_column = Some(left_time_index.clone());
2626
2627 if left_context.time_index_column != right_context.time_index_column {
2629 let right_project_exprs = right
2630 .schema()
2631 .fields()
2632 .iter()
2633 .map(|field| {
2634 if field.name() == &right_time_index {
2635 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2636 } else {
2637 DfExpr::Column(Column::from_name(field.name()))
2638 }
2639 })
2640 .collect::<Vec<_>>();
2641
2642 right = LogicalPlanBuilder::from(right)
2643 .project(right_project_exprs)
2644 .context(DataFusionPlanningSnafu)?
2645 .build()
2646 .context(DataFusionPlanningSnafu)?;
2647 }
2648
2649 ensure!(
2650 left_context.field_columns.len() == 1,
2651 MultiFieldsNotSupportedSnafu {
2652 operator: "AND operator"
2653 }
2654 );
2655 let left_field_col = left_context.field_columns.first().unwrap();
2658 self.ctx.field_columns = vec![left_field_col.clone()];
2659
2660 match op.id() {
2663 token::T_LAND => LogicalPlanBuilder::from(left)
2664 .distinct()
2665 .context(DataFusionPlanningSnafu)?
2666 .join_detailed(
2667 right,
2668 JoinType::LeftSemi,
2669 (join_keys.clone(), join_keys),
2670 None,
2671 true,
2672 )
2673 .context(DataFusionPlanningSnafu)?
2674 .build()
2675 .context(DataFusionPlanningSnafu),
2676 token::T_LUNLESS => LogicalPlanBuilder::from(left)
2677 .distinct()
2678 .context(DataFusionPlanningSnafu)?
2679 .join_detailed(
2680 right,
2681 JoinType::LeftAnti,
2682 (join_keys.clone(), join_keys),
2683 None,
2684 true,
2685 )
2686 .context(DataFusionPlanningSnafu)?
2687 .build()
2688 .context(DataFusionPlanningSnafu),
2689 token::T_LOR => {
2690 unreachable!()
2693 }
2694 _ => UnexpectedTokenSnafu { token: op }.fail(),
2695 }
2696 }
2697
2698 #[allow(clippy::too_many_arguments)]
2700 fn or_operator(
2701 &mut self,
2702 left: LogicalPlan,
2703 right: LogicalPlan,
2704 left_tag_cols_set: HashSet<String>,
2705 right_tag_cols_set: HashSet<String>,
2706 left_context: PromPlannerContext,
2707 right_context: PromPlannerContext,
2708 modifier: &Option<BinModifier>,
2709 ) -> Result<LogicalPlan> {
2710 ensure!(
2712 left_context.field_columns.len() == right_context.field_columns.len(),
2713 CombineTableColumnMismatchSnafu {
2714 left: left_context.field_columns.clone(),
2715 right: right_context.field_columns.clone()
2716 }
2717 );
2718 ensure!(
2719 left_context.field_columns.len() == 1,
2720 MultiFieldsNotSupportedSnafu {
2721 operator: "OR operator"
2722 }
2723 );
2724
2725 let all_tags = left_tag_cols_set
2727 .union(&right_tag_cols_set)
2728 .cloned()
2729 .collect::<HashSet<_>>();
2730 let tags_not_in_left = all_tags
2731 .difference(&left_tag_cols_set)
2732 .cloned()
2733 .collect::<Vec<_>>();
2734 let tags_not_in_right = all_tags
2735 .difference(&right_tag_cols_set)
2736 .cloned()
2737 .collect::<Vec<_>>();
2738 let left_qualifier = left.schema().qualified_field(0).0.cloned();
2739 let right_qualifier = right.schema().qualified_field(0).0.cloned();
2740 let left_qualifier_string = left_qualifier
2741 .as_ref()
2742 .map(|l| l.to_string())
2743 .unwrap_or_default();
2744 let right_qualifier_string = right_qualifier
2745 .as_ref()
2746 .map(|r| r.to_string())
2747 .unwrap_or_default();
2748 let left_time_index_column =
2749 left_context
2750 .time_index_column
2751 .clone()
2752 .with_context(|| TimeIndexNotFoundSnafu {
2753 table: left_qualifier_string.clone(),
2754 })?;
2755 let right_time_index_column =
2756 right_context
2757 .time_index_column
2758 .clone()
2759 .with_context(|| TimeIndexNotFoundSnafu {
2760 table: right_qualifier_string.clone(),
2761 })?;
2762 let left_field_col = left_context.field_columns.first().unwrap();
2764 let right_field_col = right_context.field_columns.first().unwrap();
2765
2766 let mut all_columns_set = left
2768 .schema()
2769 .fields()
2770 .iter()
2771 .chain(right.schema().fields().iter())
2772 .map(|field| field.name().clone())
2773 .collect::<HashSet<_>>();
2774 all_columns_set.remove(&left_time_index_column);
2776 all_columns_set.remove(&right_time_index_column);
2777 if left_field_col != right_field_col {
2779 all_columns_set.remove(right_field_col);
2780 }
2781 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2782 all_columns.sort_unstable();
2784 all_columns.insert(0, left_time_index_column.clone());
2786
2787 let left_proj_exprs = all_columns.iter().map(|col| {
2789 if tags_not_in_left.contains(col) {
2790 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2791 } else {
2792 DfExpr::Column(Column::new(None::<String>, col))
2793 }
2794 });
2795 let right_time_index_expr = DfExpr::Column(Column::new(
2796 right_qualifier.clone(),
2797 right_time_index_column,
2798 ))
2799 .alias(left_time_index_column.clone());
2800 let right_qualifier_for_field = right
2803 .schema()
2804 .iter()
2805 .find(|(_, f)| f.name() == right_field_col)
2806 .map(|(q, _)| q)
2807 .context(ColumnNotFoundSnafu {
2808 col: right_field_col.to_string(),
2809 })?
2810 .cloned();
2811
2812 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2814 if col == left_field_col && left_field_col != right_field_col {
2816 DfExpr::Column(Column::new(
2818 right_qualifier_for_field.clone(),
2819 right_field_col,
2820 ))
2821 } else if tags_not_in_right.contains(col) {
2822 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2823 } else {
2824 DfExpr::Column(Column::new(None::<String>, col))
2825 }
2826 });
2827 let right_proj_exprs = [right_time_index_expr]
2828 .into_iter()
2829 .chain(right_proj_exprs_without_time_index);
2830
2831 let left_projected = LogicalPlanBuilder::from(left)
2832 .project(left_proj_exprs)
2833 .context(DataFusionPlanningSnafu)?
2834 .alias(left_qualifier_string.clone())
2835 .context(DataFusionPlanningSnafu)?
2836 .build()
2837 .context(DataFusionPlanningSnafu)?;
2838 let right_projected = LogicalPlanBuilder::from(right)
2839 .project(right_proj_exprs)
2840 .context(DataFusionPlanningSnafu)?
2841 .alias(right_qualifier_string.clone())
2842 .context(DataFusionPlanningSnafu)?
2843 .build()
2844 .context(DataFusionPlanningSnafu)?;
2845
2846 let mut match_columns = if let Some(modifier) = modifier
2848 && let Some(matching) = &modifier.matching
2849 {
2850 match matching {
2851 LabelModifier::Include(on) => on.labels.clone(),
2853 LabelModifier::Exclude(ignoring) => {
2855 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2856 all_tags.difference(&ignoring).cloned().collect()
2857 }
2858 }
2859 } else {
2860 all_tags.iter().cloned().collect()
2861 };
2862 match_columns.sort_unstable();
2864 let schema = left_projected.schema().clone();
2866 let union_distinct_on = UnionDistinctOn::new(
2867 left_projected,
2868 right_projected,
2869 match_columns,
2870 left_time_index_column.clone(),
2871 schema,
2872 );
2873 let result = LogicalPlan::Extension(Extension {
2874 node: Arc::new(union_distinct_on),
2875 });
2876
2877 self.ctx.time_index_column = Some(left_time_index_column);
2879 self.ctx.tag_columns = all_tags.into_iter().collect();
2880 self.ctx.field_columns = vec![left_field_col.to_string()];
2881
2882 Ok(result)
2883 }
2884
2885 fn projection_for_each_field_column<F>(
2893 &mut self,
2894 input: LogicalPlan,
2895 name_to_expr: F,
2896 ) -> Result<LogicalPlan>
2897 where
2898 F: FnMut(&String) -> Result<DfExpr>,
2899 {
2900 let non_field_columns_iter = self
2901 .ctx
2902 .tag_columns
2903 .iter()
2904 .chain(self.ctx.time_index_column.iter())
2905 .map(|col| {
2906 Ok(DfExpr::Column(Column::new(
2907 self.ctx.table_name.clone().map(TableReference::bare),
2908 col,
2909 )))
2910 });
2911
2912 let result_field_columns = self
2914 .ctx
2915 .field_columns
2916 .iter()
2917 .map(name_to_expr)
2918 .collect::<Result<Vec<_>>>()?;
2919
2920 self.ctx.field_columns = result_field_columns
2922 .iter()
2923 .map(|expr| expr.schema_name().to_string())
2924 .collect();
2925 let field_columns_iter = result_field_columns
2926 .into_iter()
2927 .zip(self.ctx.field_columns.iter())
2928 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2929
2930 let project_fields = non_field_columns_iter
2932 .chain(field_columns_iter)
2933 .collect::<Result<Vec<_>>>()?;
2934
2935 LogicalPlanBuilder::from(input)
2936 .project(project_fields)
2937 .context(DataFusionPlanningSnafu)?
2938 .build()
2939 .context(DataFusionPlanningSnafu)
2940 }
2941
2942 fn filter_on_field_column<F>(
2945 &self,
2946 input: LogicalPlan,
2947 mut name_to_expr: F,
2948 ) -> Result<LogicalPlan>
2949 where
2950 F: FnMut(&String) -> Result<DfExpr>,
2951 {
2952 ensure!(
2953 self.ctx.field_columns.len() == 1,
2954 UnsupportedExprSnafu {
2955 name: "filter on multi-value input"
2956 }
2957 );
2958
2959 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
2960
2961 LogicalPlanBuilder::from(input)
2962 .filter(field_column_filter)
2963 .context(DataFusionPlanningSnafu)?
2964 .build()
2965 .context(DataFusionPlanningSnafu)
2966 }
2967
2968 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
2971 let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
2972 let input_expr = datafusion::logical_expr::col(
2973 self.ctx
2974 .time_index_column
2975 .as_ref()
2976 .with_context(|| TimeIndexNotFoundSnafu {
2978 table: "<doesn't matter>",
2979 })?,
2980 );
2981 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2982 func: datafusion_functions::datetime::date_part(),
2983 args: vec![lit_expr, input_expr],
2984 });
2985 Ok(fn_expr)
2986 }
2987}
2988
2989#[derive(Default, Debug)]
2990struct FunctionArgs {
2991 input: Option<PromExpr>,
2992 literals: Vec<DfExpr>,
2993}
2994
2995#[derive(Debug, Clone)]
2996enum ScalarFunc {
2997 DataFusionBuiltin(Arc<ScalarUdfDef>),
2998 DataFusionUdf(Arc<ScalarUdfDef>),
3000 Udf(Arc<ScalarUdfDef>),
3001 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3005 GeneratedExpr,
3007}
3008
3009#[cfg(test)]
3010mod test {
3011 use std::time::{Duration, UNIX_EPOCH};
3012
3013 use catalog::memory::MemoryCatalogManager;
3014 use catalog::RegisterTableRequest;
3015 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3016 use common_query::test_util::DummyDecoder;
3017 use datafusion::execution::SessionStateBuilder;
3018 use datatypes::prelude::ConcreteDataType;
3019 use datatypes::schema::{ColumnSchema, Schema};
3020 use promql_parser::label::Labels;
3021 use promql_parser::parser;
3022 use session::context::QueryContext;
3023 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3024 use table::test_util::EmptyTable;
3025
3026 use super::*;
3027
3028 fn build_session_state() -> SessionState {
3029 SessionStateBuilder::new().with_default_features().build()
3030 }
3031
3032 async fn build_test_table_provider(
3033 table_name_tuples: &[(String, String)],
3034 num_tag: usize,
3035 num_field: usize,
3036 ) -> DfTableSourceProvider {
3037 let catalog_list = MemoryCatalogManager::with_default_setup();
3038 for (schema_name, table_name) in table_name_tuples {
3039 let mut columns = vec![];
3040 for i in 0..num_tag {
3041 columns.push(ColumnSchema::new(
3042 format!("tag_{i}"),
3043 ConcreteDataType::string_datatype(),
3044 false,
3045 ));
3046 }
3047 columns.push(
3048 ColumnSchema::new(
3049 "timestamp".to_string(),
3050 ConcreteDataType::timestamp_millisecond_datatype(),
3051 false,
3052 )
3053 .with_time_index(true),
3054 );
3055 for i in 0..num_field {
3056 columns.push(ColumnSchema::new(
3057 format!("field_{i}"),
3058 ConcreteDataType::float64_datatype(),
3059 true,
3060 ));
3061 }
3062 let schema = Arc::new(Schema::new(columns));
3063 let table_meta = TableMetaBuilder::empty()
3064 .schema(schema)
3065 .primary_key_indices((0..num_tag).collect())
3066 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3067 .next_column_id(1024)
3068 .build()
3069 .unwrap();
3070 let table_info = TableInfoBuilder::default()
3071 .name(table_name.to_string())
3072 .meta(table_meta)
3073 .build()
3074 .unwrap();
3075 let table = EmptyTable::from_table_info(&table_info);
3076
3077 assert!(catalog_list
3078 .register_table_sync(RegisterTableRequest {
3079 catalog: DEFAULT_CATALOG_NAME.to_string(),
3080 schema: schema_name.to_string(),
3081 table_name: table_name.to_string(),
3082 table_id: 1024,
3083 table,
3084 })
3085 .is_ok());
3086 }
3087
3088 DfTableSourceProvider::new(
3089 catalog_list,
3090 false,
3091 QueryContext::arc(),
3092 DummyDecoder::arc(),
3093 false,
3094 )
3095 }
3096
3097 async fn build_test_table_provider_with_fields(
3098 table_name_tuples: &[(String, String)],
3099 tags: &[&str],
3100 ) -> DfTableSourceProvider {
3101 let catalog_list = MemoryCatalogManager::with_default_setup();
3102 for (schema_name, table_name) in table_name_tuples {
3103 let mut columns = vec![];
3104 let num_tag = tags.len();
3105 for tag in tags {
3106 columns.push(ColumnSchema::new(
3107 tag.to_string(),
3108 ConcreteDataType::string_datatype(),
3109 false,
3110 ));
3111 }
3112 columns.push(
3113 ColumnSchema::new(
3114 "greptime_timestamp".to_string(),
3115 ConcreteDataType::timestamp_millisecond_datatype(),
3116 false,
3117 )
3118 .with_time_index(true),
3119 );
3120 columns.push(ColumnSchema::new(
3121 "greptime_value".to_string(),
3122 ConcreteDataType::float64_datatype(),
3123 true,
3124 ));
3125 let schema = Arc::new(Schema::new(columns));
3126 let table_meta = TableMetaBuilder::empty()
3127 .schema(schema)
3128 .primary_key_indices((0..num_tag).collect())
3129 .next_column_id(1024)
3130 .build()
3131 .unwrap();
3132 let table_info = TableInfoBuilder::default()
3133 .name(table_name.to_string())
3134 .meta(table_meta)
3135 .build()
3136 .unwrap();
3137 let table = EmptyTable::from_table_info(&table_info);
3138
3139 assert!(catalog_list
3140 .register_table_sync(RegisterTableRequest {
3141 catalog: DEFAULT_CATALOG_NAME.to_string(),
3142 schema: schema_name.to_string(),
3143 table_name: table_name.to_string(),
3144 table_id: 1024,
3145 table,
3146 })
3147 .is_ok());
3148 }
3149
3150 DfTableSourceProvider::new(
3151 catalog_list,
3152 false,
3153 QueryContext::arc(),
3154 DummyDecoder::arc(),
3155 false,
3156 )
3157 }
3158
3159 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3175 let prom_expr =
3176 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3177 let eval_stmt = EvalStmt {
3178 expr: prom_expr,
3179 start: UNIX_EPOCH,
3180 end: UNIX_EPOCH
3181 .checked_add(Duration::from_secs(100_000))
3182 .unwrap(),
3183 interval: Duration::from_secs(5),
3184 lookback_delta: Duration::from_secs(1),
3185 };
3186
3187 let table_provider = build_test_table_provider(
3188 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3189 1,
3190 1,
3191 )
3192 .await;
3193 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3194 .await
3195 .unwrap();
3196
3197 let expected = String::from(
3198 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3199 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3200 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3201 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3202 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3203 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3204 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3205 ).replace("TEMPLATE", plan_name);
3206
3207 assert_eq!(plan.display_indent_schema().to_string(), expected);
3208 }
3209
3210 #[tokio::test]
3211 async fn single_abs() {
3212 do_single_instant_function_call("abs", "abs").await;
3213 }
3214
3215 #[tokio::test]
3216 #[should_panic]
3217 async fn single_absent() {
3218 do_single_instant_function_call("absent", "").await;
3219 }
3220
3221 #[tokio::test]
3222 async fn single_ceil() {
3223 do_single_instant_function_call("ceil", "ceil").await;
3224 }
3225
3226 #[tokio::test]
3227 async fn single_exp() {
3228 do_single_instant_function_call("exp", "exp").await;
3229 }
3230
3231 #[tokio::test]
3232 async fn single_ln() {
3233 do_single_instant_function_call("ln", "ln").await;
3234 }
3235
3236 #[tokio::test]
3237 async fn single_log2() {
3238 do_single_instant_function_call("log2", "log2").await;
3239 }
3240
3241 #[tokio::test]
3242 async fn single_log10() {
3243 do_single_instant_function_call("log10", "log10").await;
3244 }
3245
3246 #[tokio::test]
3247 #[should_panic]
3248 async fn single_scalar() {
3249 do_single_instant_function_call("scalar", "").await;
3250 }
3251
3252 #[tokio::test]
3253 #[should_panic]
3254 async fn single_sgn() {
3255 do_single_instant_function_call("sgn", "").await;
3256 }
3257
3258 #[tokio::test]
3259 #[should_panic]
3260 async fn single_sort() {
3261 do_single_instant_function_call("sort", "").await;
3262 }
3263
3264 #[tokio::test]
3265 #[should_panic]
3266 async fn single_sort_desc() {
3267 do_single_instant_function_call("sort_desc", "").await;
3268 }
3269
3270 #[tokio::test]
3271 async fn single_sqrt() {
3272 do_single_instant_function_call("sqrt", "sqrt").await;
3273 }
3274
3275 #[tokio::test]
3276 #[should_panic]
3277 async fn single_timestamp() {
3278 do_single_instant_function_call("timestamp", "").await;
3279 }
3280
3281 #[tokio::test]
3282 async fn single_acos() {
3283 do_single_instant_function_call("acos", "acos").await;
3284 }
3285
3286 #[tokio::test]
3287 #[should_panic]
3288 async fn single_acosh() {
3289 do_single_instant_function_call("acosh", "").await;
3290 }
3291
3292 #[tokio::test]
3293 async fn single_asin() {
3294 do_single_instant_function_call("asin", "asin").await;
3295 }
3296
3297 #[tokio::test]
3298 #[should_panic]
3299 async fn single_asinh() {
3300 do_single_instant_function_call("asinh", "").await;
3301 }
3302
3303 #[tokio::test]
3304 async fn single_atan() {
3305 do_single_instant_function_call("atan", "atan").await;
3306 }
3307
3308 #[tokio::test]
3309 #[should_panic]
3310 async fn single_atanh() {
3311 do_single_instant_function_call("atanh", "").await;
3312 }
3313
3314 #[tokio::test]
3315 async fn single_cos() {
3316 do_single_instant_function_call("cos", "cos").await;
3317 }
3318
3319 #[tokio::test]
3320 #[should_panic]
3321 async fn single_cosh() {
3322 do_single_instant_function_call("cosh", "").await;
3323 }
3324
3325 #[tokio::test]
3326 async fn single_sin() {
3327 do_single_instant_function_call("sin", "sin").await;
3328 }
3329
3330 #[tokio::test]
3331 #[should_panic]
3332 async fn single_sinh() {
3333 do_single_instant_function_call("sinh", "").await;
3334 }
3335
3336 #[tokio::test]
3337 async fn single_tan() {
3338 do_single_instant_function_call("tan", "tan").await;
3339 }
3340
3341 #[tokio::test]
3342 #[should_panic]
3343 async fn single_tanh() {
3344 do_single_instant_function_call("tanh", "").await;
3345 }
3346
3347 #[tokio::test]
3348 #[should_panic]
3349 async fn single_deg() {
3350 do_single_instant_function_call("deg", "").await;
3351 }
3352
3353 #[tokio::test]
3354 #[should_panic]
3355 async fn single_rad() {
3356 do_single_instant_function_call("rad", "").await;
3357 }
3358
3359 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3381 let prom_expr = parser::parse(&format!(
3382 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3383 ))
3384 .unwrap();
3385 let mut eval_stmt = EvalStmt {
3386 expr: prom_expr,
3387 start: UNIX_EPOCH,
3388 end: UNIX_EPOCH
3389 .checked_add(Duration::from_secs(100_000))
3390 .unwrap(),
3391 interval: Duration::from_secs(5),
3392 lookback_delta: Duration::from_secs(1),
3393 };
3394
3395 let table_provider = build_test_table_provider(
3397 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3398 2,
3399 2,
3400 )
3401 .await;
3402 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3403 .await
3404 .unwrap();
3405 let expected_no_without = String::from(
3406 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3407 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3408 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3409 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3410 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3411 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3412 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3413 ).replace("TEMPLATE", plan_name);
3414 assert_eq!(
3415 plan.display_indent_schema().to_string(),
3416 expected_no_without
3417 );
3418
3419 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3421 *modifier = Some(LabelModifier::Exclude(Labels {
3422 labels: vec![String::from("tag_1")].into_iter().collect(),
3423 }));
3424 }
3425 let table_provider = build_test_table_provider(
3426 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3427 2,
3428 2,
3429 )
3430 .await;
3431 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3432 .await
3433 .unwrap();
3434 let expected_without = String::from(
3435 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3436 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3437 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3438 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3439 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3440 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3441 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3442 ).replace("TEMPLATE", plan_name);
3443 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3444 }
3445
3446 #[tokio::test]
3447 async fn aggregate_sum() {
3448 do_aggregate_expr_plan("sum", "sum").await;
3449 }
3450
3451 #[tokio::test]
3452 async fn aggregate_avg() {
3453 do_aggregate_expr_plan("avg", "avg").await;
3454 }
3455
3456 #[tokio::test]
3457 #[should_panic] async fn aggregate_count() {
3459 do_aggregate_expr_plan("count", "count").await;
3460 }
3461
3462 #[tokio::test]
3463 async fn aggregate_min() {
3464 do_aggregate_expr_plan("min", "min").await;
3465 }
3466
3467 #[tokio::test]
3468 async fn aggregate_max() {
3469 do_aggregate_expr_plan("max", "max").await;
3470 }
3471
3472 #[tokio::test]
3473 #[should_panic] async fn aggregate_group() {
3475 do_aggregate_expr_plan("grouping", "GROUPING").await;
3476 }
3477
3478 #[tokio::test]
3479 async fn aggregate_stddev() {
3480 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3481 }
3482
3483 #[tokio::test]
3484 async fn aggregate_stdvar() {
3485 do_aggregate_expr_plan("stdvar", "var_pop").await;
3486 }
3487
3488 #[tokio::test]
3512 async fn binary_op_column_column() {
3513 let prom_expr =
3514 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3515 let eval_stmt = EvalStmt {
3516 expr: prom_expr,
3517 start: UNIX_EPOCH,
3518 end: UNIX_EPOCH
3519 .checked_add(Duration::from_secs(100_000))
3520 .unwrap(),
3521 interval: Duration::from_secs(5),
3522 lookback_delta: Duration::from_secs(1),
3523 };
3524
3525 let table_provider = build_test_table_provider(
3526 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3527 1,
3528 1,
3529 )
3530 .await;
3531 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3532 .await
3533 .unwrap();
3534
3535 let expected = String::from(
3536 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3537 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3538 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3539 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3540 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3541 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3542 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3543 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3544 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3545 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3546 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3547 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3548 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3549 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3550 );
3551
3552 assert_eq!(plan.display_indent_schema().to_string(), expected);
3553 }
3554
3555 async fn indie_query_plan_compare(query: &str, expected: String) {
3556 let prom_expr = parser::parse(query).unwrap();
3557 let eval_stmt = EvalStmt {
3558 expr: prom_expr,
3559 start: UNIX_EPOCH,
3560 end: UNIX_EPOCH
3561 .checked_add(Duration::from_secs(100_000))
3562 .unwrap(),
3563 interval: Duration::from_secs(5),
3564 lookback_delta: Duration::from_secs(1),
3565 };
3566
3567 let table_provider = build_test_table_provider(
3568 &[
3569 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3570 (
3571 "greptime_private".to_string(),
3572 "some_alt_metric".to_string(),
3573 ),
3574 ],
3575 1,
3576 1,
3577 )
3578 .await;
3579 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3580 .await
3581 .unwrap();
3582
3583 assert_eq!(plan.display_indent_schema().to_string(), expected);
3584 }
3585
3586 #[tokio::test]
3587 async fn binary_op_literal_column() {
3588 let query = r#"1 + some_metric{tag_0="bar"}"#;
3589 let expected = String::from(
3590 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3591 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3592 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3593 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3594 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3595 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3596 );
3597
3598 indie_query_plan_compare(query, expected).await;
3599 }
3600
3601 #[tokio::test]
3602 async fn binary_op_literal_literal() {
3603 let query = r#"1 + 1"#;
3604 let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3605
3606 indie_query_plan_compare(query, expected).await;
3607 }
3608
3609 #[tokio::test]
3610 async fn simple_bool_grammar() {
3611 let query = "some_metric != bool 1.2345";
3612 let expected = String::from(
3613 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3614 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3615 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3616 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3617 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3618 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3619 );
3620
3621 indie_query_plan_compare(query, expected).await;
3622 }
3623
3624 #[tokio::test]
3625 async fn bool_with_additional_arithmetic() {
3626 let query = "some_metric + (1 == bool 2)";
3627 let expected = String::from(
3628 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3629 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3630 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3631 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3632 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3633 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3634 );
3635
3636 indie_query_plan_compare(query, expected).await;
3637 }
3638
3639 #[tokio::test]
3640 async fn simple_unary() {
3641 let query = "-some_metric";
3642 let expected = String::from(
3643 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3644 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3645 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3646 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3647 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3648 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3649 );
3650
3651 indie_query_plan_compare(query, expected).await;
3652 }
3653
3654 #[tokio::test]
3655 async fn increase_aggr() {
3656 let query = "increase(some_metric[5m])";
3657 let expected = String::from(
3658 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3659 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3660 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3661 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3662 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3663 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3664 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3665 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3666 );
3667
3668 indie_query_plan_compare(query, expected).await;
3669 }
3670
3671 #[tokio::test]
3672 async fn less_filter_on_value() {
3673 let query = "some_metric < 1.2345";
3674 let expected = String::from(
3675 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3676 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3677 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3678 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3679 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3680 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3681 );
3682
3683 indie_query_plan_compare(query, expected).await;
3684 }
3685
3686 #[tokio::test]
3687 async fn count_over_time() {
3688 let query = "count_over_time(some_metric[5m])";
3689 let expected = String::from(
3690 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3691 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3692 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3693 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3694 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3695 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3696 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3697 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3698 );
3699
3700 indie_query_plan_compare(query, expected).await;
3701 }
3702
3703 #[tokio::test]
3704 async fn test_hash_join() {
3705 let mut eval_stmt = EvalStmt {
3706 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3707 start: UNIX_EPOCH,
3708 end: UNIX_EPOCH
3709 .checked_add(Duration::from_secs(100_000))
3710 .unwrap(),
3711 interval: Duration::from_secs(5),
3712 lookback_delta: Duration::from_secs(1),
3713 };
3714
3715 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3716
3717 let prom_expr = parser::parse(case).unwrap();
3718 eval_stmt.expr = prom_expr;
3719 let table_provider = build_test_table_provider_with_fields(
3720 &[
3721 (
3722 DEFAULT_SCHEMA_NAME.to_string(),
3723 "http_server_requests_seconds_sum".to_string(),
3724 ),
3725 (
3726 DEFAULT_SCHEMA_NAME.to_string(),
3727 "http_server_requests_seconds_count".to_string(),
3728 ),
3729 ],
3730 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3731 )
3732 .await;
3733 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3735 .await
3736 .unwrap();
3737 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3738 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3739 \n SubqueryAlias: http_server_requests_seconds_sum\
3740 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3741 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3742 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3743 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3744 \n TableScan: http_server_requests_seconds_sum\
3745 \n SubqueryAlias: http_server_requests_seconds_count\
3746 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3747 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3748 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3749 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3750 \n TableScan: http_server_requests_seconds_count";
3751 assert_eq!(plan.to_string(), expected);
3752 }
3753
3754 #[tokio::test]
3755 async fn test_nested_histogram_quantile() {
3756 let mut eval_stmt = EvalStmt {
3757 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3758 start: UNIX_EPOCH,
3759 end: UNIX_EPOCH
3760 .checked_add(Duration::from_secs(100_000))
3761 .unwrap(),
3762 interval: Duration::from_secs(5),
3763 lookback_delta: Duration::from_secs(1),
3764 };
3765
3766 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3767
3768 let prom_expr = parser::parse(case).unwrap();
3769 eval_stmt.expr = prom_expr;
3770 let table_provider = build_test_table_provider_with_fields(
3771 &[(
3772 DEFAULT_SCHEMA_NAME.to_string(),
3773 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3774 )],
3775 &["pod", "le", "path", "code", "container"],
3776 )
3777 .await;
3778 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3780 .await
3781 .unwrap();
3782 }
3783
3784 #[tokio::test]
3785 async fn test_parse_and_operator() {
3786 let mut eval_stmt = EvalStmt {
3787 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3788 start: UNIX_EPOCH,
3789 end: UNIX_EPOCH
3790 .checked_add(Duration::from_secs(100_000))
3791 .unwrap(),
3792 interval: Duration::from_secs(5),
3793 lookback_delta: Duration::from_secs(1),
3794 };
3795
3796 let cases = [
3797 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3798 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3799 ];
3800
3801 for case in cases {
3802 let prom_expr = parser::parse(case).unwrap();
3803 eval_stmt.expr = prom_expr;
3804 let table_provider = build_test_table_provider_with_fields(
3805 &[
3806 (
3807 DEFAULT_SCHEMA_NAME.to_string(),
3808 "kubelet_volume_stats_used_bytes".to_string(),
3809 ),
3810 (
3811 DEFAULT_SCHEMA_NAME.to_string(),
3812 "kubelet_volume_stats_capacity_bytes".to_string(),
3813 ),
3814 ],
3815 &["namespace", "persistentvolumeclaim"],
3816 )
3817 .await;
3818 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3820 .await
3821 .unwrap();
3822 }
3823 }
3824
3825 #[tokio::test]
3826 async fn test_nested_binary_op() {
3827 let mut eval_stmt = EvalStmt {
3828 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3829 start: UNIX_EPOCH,
3830 end: UNIX_EPOCH
3831 .checked_add(Duration::from_secs(100_000))
3832 .unwrap(),
3833 interval: Duration::from_secs(5),
3834 lookback_delta: Duration::from_secs(1),
3835 };
3836
3837 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3838 (
3839 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3840 or
3841 vector(0)
3842 )"#;
3843
3844 let prom_expr = parser::parse(case).unwrap();
3845 eval_stmt.expr = prom_expr;
3846 let table_provider = build_test_table_provider_with_fields(
3847 &[(
3848 DEFAULT_SCHEMA_NAME.to_string(),
3849 "nginx_ingress_controller_requests".to_string(),
3850 )],
3851 &["namespace", "job"],
3852 )
3853 .await;
3854 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3856 .await
3857 .unwrap();
3858 }
3859
3860 #[tokio::test]
3861 async fn test_parse_or_operator() {
3862 let mut eval_stmt = EvalStmt {
3863 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3864 start: UNIX_EPOCH,
3865 end: UNIX_EPOCH
3866 .checked_add(Duration::from_secs(100_000))
3867 .unwrap(),
3868 interval: Duration::from_secs(5),
3869 lookback_delta: Duration::from_secs(1),
3870 };
3871
3872 let case = r#"
3873 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
3874 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3875 or
3876 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
3877 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3878
3879 let table_provider = build_test_table_provider_with_fields(
3880 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3881 &["tenant_name", "cluster_name"],
3882 )
3883 .await;
3884 eval_stmt.expr = parser::parse(case).unwrap();
3885 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3886 .await
3887 .unwrap();
3888
3889 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3890 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3891 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3892 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3893 or
3894 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3895 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3896 or
3897 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3898 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3899 let table_provider = build_test_table_provider_with_fields(
3900 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3901 &["tenant_name", "cluster_name"],
3902 )
3903 .await;
3904 eval_stmt.expr = parser::parse(case).unwrap();
3905 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3906 .await
3907 .unwrap();
3908
3909 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
3910 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3911 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3912 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3913 let table_provider = build_test_table_provider_with_fields(
3914 &[
3915 (
3916 DEFAULT_SCHEMA_NAME.to_string(),
3917 "background_waitevent_cnt".to_string(),
3918 ),
3919 (
3920 DEFAULT_SCHEMA_NAME.to_string(),
3921 "foreground_waitevent_cnt".to_string(),
3922 ),
3923 ],
3924 &["tenant_name", "cluster_name"],
3925 )
3926 .await;
3927 eval_stmt.expr = parser::parse(case).unwrap();
3928 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3929 .await
3930 .unwrap();
3931
3932 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3933 let table_provider = build_test_table_provider_with_fields(
3934 &[
3935 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3936 (
3937 DEFAULT_SCHEMA_NAME.to_string(),
3938 "container_cpu_load_average_10s".to_string(),
3939 ),
3940 (
3941 DEFAULT_SCHEMA_NAME.to_string(),
3942 "container_spec_cpu_quota".to_string(),
3943 ),
3944 ],
3945 &["cluster_name", "host_name"],
3946 )
3947 .await;
3948 eval_stmt.expr = parser::parse(case).unwrap();
3949 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3950 .await
3951 .unwrap();
3952 }
3953
3954 #[tokio::test]
3955 async fn value_matcher() {
3956 let mut eval_stmt = EvalStmt {
3958 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3959 start: UNIX_EPOCH,
3960 end: UNIX_EPOCH
3961 .checked_add(Duration::from_secs(100_000))
3962 .unwrap(),
3963 interval: Duration::from_secs(5),
3964 lookback_delta: Duration::from_secs(1),
3965 };
3966
3967 let cases = [
3968 (
3970 r#"some_metric{__field__="field_1"}"#,
3971 vec![
3972 "some_metric.field_1",
3973 "some_metric.tag_0",
3974 "some_metric.tag_1",
3975 "some_metric.tag_2",
3976 "some_metric.timestamp",
3977 ],
3978 ),
3979 (
3981 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
3982 vec![
3983 "some_metric.field_0",
3984 "some_metric.field_1",
3985 "some_metric.tag_0",
3986 "some_metric.tag_1",
3987 "some_metric.tag_2",
3988 "some_metric.timestamp",
3989 ],
3990 ),
3991 (
3993 r#"some_metric{__field__!="field_1"}"#,
3994 vec![
3995 "some_metric.field_0",
3996 "some_metric.field_2",
3997 "some_metric.tag_0",
3998 "some_metric.tag_1",
3999 "some_metric.tag_2",
4000 "some_metric.timestamp",
4001 ],
4002 ),
4003 (
4005 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4006 vec![
4007 "some_metric.field_0",
4008 "some_metric.tag_0",
4009 "some_metric.tag_1",
4010 "some_metric.tag_2",
4011 "some_metric.timestamp",
4012 ],
4013 ),
4014 (
4016 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4017 vec![
4018 "some_metric.field_1",
4019 "some_metric.tag_0",
4020 "some_metric.tag_1",
4021 "some_metric.tag_2",
4022 "some_metric.timestamp",
4023 ],
4024 ),
4025 (
4027 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4028 vec![
4029 "some_metric.tag_0",
4030 "some_metric.tag_1",
4031 "some_metric.tag_2",
4032 "some_metric.timestamp",
4033 ],
4034 ),
4035 (
4037 r#"some_metric{__field__=~"field_1|field_2"}"#,
4038 vec![
4039 "some_metric.field_1",
4040 "some_metric.field_2",
4041 "some_metric.tag_0",
4042 "some_metric.tag_1",
4043 "some_metric.tag_2",
4044 "some_metric.timestamp",
4045 ],
4046 ),
4047 (
4049 r#"some_metric{__field__!~"field_1|field_2"}"#,
4050 vec![
4051 "some_metric.field_0",
4052 "some_metric.tag_0",
4053 "some_metric.tag_1",
4054 "some_metric.tag_2",
4055 "some_metric.timestamp",
4056 ],
4057 ),
4058 ];
4059
4060 for case in cases {
4061 let prom_expr = parser::parse(case.0).unwrap();
4062 eval_stmt.expr = prom_expr;
4063 let table_provider = build_test_table_provider(
4064 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4065 3,
4066 3,
4067 )
4068 .await;
4069 let plan =
4070 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4071 .await
4072 .unwrap();
4073 let mut fields = plan.schema().field_names();
4074 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4075 fields.sort();
4076 expected.sort();
4077 assert_eq!(fields, expected, "case: {:?}", case.0);
4078 }
4079
4080 let bad_cases = [
4081 r#"some_metric{__field__="nonexistent"}"#,
4082 r#"some_metric{__field__!="nonexistent"}"#,
4083 ];
4084
4085 for case in bad_cases {
4086 let prom_expr = parser::parse(case).unwrap();
4087 eval_stmt.expr = prom_expr;
4088 let table_provider = build_test_table_provider(
4089 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4090 3,
4091 3,
4092 )
4093 .await;
4094 let plan =
4095 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4096 assert!(plan.is_err(), "case: {:?}", case);
4097 }
4098 }
4099
4100 #[tokio::test]
4101 async fn custom_schema() {
4102 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4103 let expected = String::from(
4104 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4105 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4106 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4107 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4108 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4109 );
4110
4111 indie_query_plan_compare(query, expected).await;
4112
4113 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4114 let expected = String::from(
4115 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4116 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4117 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4118 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4119 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4120 );
4121
4122 indie_query_plan_compare(query, expected).await;
4123
4124 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4125 let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4126 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4127 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4128 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4129 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4130 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4131 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4132 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4133 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4134 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4135 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4136 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4137 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4138 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4139
4140 indie_query_plan_compare(query, expected).await;
4141 }
4142
4143 #[tokio::test]
4144 async fn only_equals_is_supported_for_special_matcher() {
4145 let queries = &[
4146 "some_alt_metric{__schema__!=\"greptime_private\"}",
4147 "some_alt_metric{__schema__=~\"lalala\"}",
4148 "some_alt_metric{__database__!=\"greptime_private\"}",
4149 "some_alt_metric{__database__=~\"lalala\"}",
4150 ];
4151
4152 for query in queries {
4153 let prom_expr = parser::parse(query).unwrap();
4154 let eval_stmt = EvalStmt {
4155 expr: prom_expr,
4156 start: UNIX_EPOCH,
4157 end: UNIX_EPOCH
4158 .checked_add(Duration::from_secs(100_000))
4159 .unwrap(),
4160 interval: Duration::from_secs(5),
4161 lookback_delta: Duration::from_secs(1),
4162 };
4163
4164 let table_provider = build_test_table_provider(
4165 &[
4166 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4167 (
4168 "greptime_private".to_string(),
4169 "some_alt_metric".to_string(),
4170 ),
4171 ],
4172 1,
4173 1,
4174 )
4175 .await;
4176
4177 let plan =
4178 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4179 assert!(plan.is_err(), "query: {:?}", query);
4180 }
4181 }
4182
4183 #[tokio::test]
4184 async fn test_non_ms_precision() {
4185 let catalog_list = MemoryCatalogManager::with_default_setup();
4186 let columns = vec![
4187 ColumnSchema::new(
4188 "tag".to_string(),
4189 ConcreteDataType::string_datatype(),
4190 false,
4191 ),
4192 ColumnSchema::new(
4193 "timestamp".to_string(),
4194 ConcreteDataType::timestamp_nanosecond_datatype(),
4195 false,
4196 )
4197 .with_time_index(true),
4198 ColumnSchema::new(
4199 "field".to_string(),
4200 ConcreteDataType::float64_datatype(),
4201 true,
4202 ),
4203 ];
4204 let schema = Arc::new(Schema::new(columns));
4205 let table_meta = TableMetaBuilder::empty()
4206 .schema(schema)
4207 .primary_key_indices(vec![0])
4208 .value_indices(vec![2])
4209 .next_column_id(1024)
4210 .build()
4211 .unwrap();
4212 let table_info = TableInfoBuilder::default()
4213 .name("metrics".to_string())
4214 .meta(table_meta)
4215 .build()
4216 .unwrap();
4217 let table = EmptyTable::from_table_info(&table_info);
4218 assert!(catalog_list
4219 .register_table_sync(RegisterTableRequest {
4220 catalog: DEFAULT_CATALOG_NAME.to_string(),
4221 schema: DEFAULT_SCHEMA_NAME.to_string(),
4222 table_name: "metrics".to_string(),
4223 table_id: 1024,
4224 table,
4225 })
4226 .is_ok());
4227
4228 let plan = PromPlanner::stmt_to_plan(
4229 DfTableSourceProvider::new(
4230 catalog_list.clone(),
4231 false,
4232 QueryContext::arc(),
4233 DummyDecoder::arc(),
4234 true,
4235 ),
4236 &EvalStmt {
4237 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4238 start: UNIX_EPOCH,
4239 end: UNIX_EPOCH
4240 .checked_add(Duration::from_secs(100_000))
4241 .unwrap(),
4242 interval: Duration::from_secs(5),
4243 lookback_delta: Duration::from_secs(1),
4244 },
4245 &build_session_state(),
4246 )
4247 .await
4248 .unwrap();
4249 assert_eq!(plan.display_indent_schema().to_string(),
4250 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4251 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4252 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4253 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4254 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4255 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4256 );
4257 let plan = PromPlanner::stmt_to_plan(
4258 DfTableSourceProvider::new(
4259 catalog_list.clone(),
4260 false,
4261 QueryContext::arc(),
4262 DummyDecoder::arc(),
4263 true,
4264 ),
4265 &EvalStmt {
4266 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4267 start: UNIX_EPOCH,
4268 end: UNIX_EPOCH
4269 .checked_add(Duration::from_secs(100_000))
4270 .unwrap(),
4271 interval: Duration::from_secs(5),
4272 lookback_delta: Duration::from_secs(1),
4273 },
4274 &build_session_state(),
4275 )
4276 .await
4277 .unwrap();
4278 assert_eq!(plan.display_indent_schema().to_string(),
4279 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4280 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4281 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4282 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4283 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4284 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4285 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4286 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4287 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4288 );
4289 }
4290
4291 #[tokio::test]
4292 async fn test_nonexistent_label() {
4293 let mut eval_stmt = EvalStmt {
4295 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4296 start: UNIX_EPOCH,
4297 end: UNIX_EPOCH
4298 .checked_add(Duration::from_secs(100_000))
4299 .unwrap(),
4300 interval: Duration::from_secs(5),
4301 lookback_delta: Duration::from_secs(1),
4302 };
4303
4304 let case = r#"some_metric{nonexistent="hi"}"#;
4305 let prom_expr = parser::parse(case).unwrap();
4306 eval_stmt.expr = prom_expr;
4307 let table_provider = build_test_table_provider(
4308 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4309 3,
4310 3,
4311 )
4312 .await;
4313 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4315 .await
4316 .unwrap();
4317 }
4318
4319 #[tokio::test]
4320 async fn test_label_join() {
4321 let prom_expr = parser::parse(
4322 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4323 )
4324 .unwrap();
4325 let eval_stmt = EvalStmt {
4326 expr: prom_expr,
4327 start: UNIX_EPOCH,
4328 end: UNIX_EPOCH
4329 .checked_add(Duration::from_secs(100_000))
4330 .unwrap(),
4331 interval: Duration::from_secs(5),
4332 lookback_delta: Duration::from_secs(1),
4333 };
4334
4335 let table_provider =
4336 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4337 .await;
4338 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4339 .await
4340 .unwrap();
4341
4342 let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4343 \n Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4344 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4345 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4346 \n Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4347 \n Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4348 \n TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4349
4350 assert_eq!(plan.display_indent_schema().to_string(), expected);
4351 }
4352
4353 #[tokio::test]
4354 async fn test_label_replace() {
4355 let prom_expr = parser::parse(
4356 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4357 )
4358 .unwrap();
4359 let eval_stmt = EvalStmt {
4360 expr: prom_expr,
4361 start: UNIX_EPOCH,
4362 end: UNIX_EPOCH
4363 .checked_add(Duration::from_secs(100_000))
4364 .unwrap(),
4365 interval: Duration::from_secs(5),
4366 lookback_delta: Duration::from_secs(1),
4367 };
4368
4369 let table_provider =
4370 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4371 .await;
4372 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4373 .await
4374 .unwrap();
4375
4376 let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4377 \n Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4378 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4379 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4380 \n Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4381 \n Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4382 \n TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4383
4384 assert_eq!(plan.display_indent_schema().to_string(), expected);
4385 }
4386
4387 #[tokio::test]
4388 async fn test_matchers_to_expr() {
4389 let mut eval_stmt = EvalStmt {
4390 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4391 start: UNIX_EPOCH,
4392 end: UNIX_EPOCH
4393 .checked_add(Duration::from_secs(100_000))
4394 .unwrap(),
4395 interval: Duration::from_secs(5),
4396 lookback_delta: Duration::from_secs(1),
4397 };
4398 let case =
4399 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4400
4401 let prom_expr = parser::parse(case).unwrap();
4402 eval_stmt.expr = prom_expr;
4403 let table_provider = build_test_table_provider(
4404 &[(
4405 DEFAULT_SCHEMA_NAME.to_string(),
4406 "prometheus_tsdb_head_series".to_string(),
4407 )],
4408 3,
4409 3,
4410 )
4411 .await;
4412 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4413 .await
4414 .unwrap();
4415 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4416 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4417 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4418 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4419 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4420 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4421 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4422 assert_eq!(plan.display_indent_schema().to_string(), expected);
4423 }
4424
4425 #[tokio::test]
4426 async fn test_topk_expr() {
4427 let mut eval_stmt = EvalStmt {
4428 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4429 start: UNIX_EPOCH,
4430 end: UNIX_EPOCH
4431 .checked_add(Duration::from_secs(100_000))
4432 .unwrap(),
4433 interval: Duration::from_secs(5),
4434 lookback_delta: Duration::from_secs(1),
4435 };
4436 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4437
4438 let prom_expr = parser::parse(case).unwrap();
4439 eval_stmt.expr = prom_expr;
4440 let table_provider = build_test_table_provider_with_fields(
4441 &[
4442 (
4443 DEFAULT_SCHEMA_NAME.to_string(),
4444 "prometheus_tsdb_head_series".to_string(),
4445 ),
4446 (
4447 DEFAULT_SCHEMA_NAME.to_string(),
4448 "http_server_requests_seconds_count".to_string(),
4449 ),
4450 ],
4451 &["ip"],
4452 )
4453 .await;
4454
4455 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4456 .await
4457 .unwrap();
4458 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4459 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4460 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4461 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4462 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4463 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4464 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4465 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4466 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4467 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4468 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4469
4470 assert_eq!(plan.display_indent_schema().to_string(), expected);
4471 }
4472
4473 #[tokio::test]
4474 async fn test_count_values_expr() {
4475 let mut eval_stmt = EvalStmt {
4476 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4477 start: UNIX_EPOCH,
4478 end: UNIX_EPOCH
4479 .checked_add(Duration::from_secs(100_000))
4480 .unwrap(),
4481 interval: Duration::from_secs(5),
4482 lookback_delta: Duration::from_secs(1),
4483 };
4484 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4485
4486 let prom_expr = parser::parse(case).unwrap();
4487 eval_stmt.expr = prom_expr;
4488 let table_provider = build_test_table_provider_with_fields(
4489 &[
4490 (
4491 DEFAULT_SCHEMA_NAME.to_string(),
4492 "prometheus_tsdb_head_series".to_string(),
4493 ),
4494 (
4495 DEFAULT_SCHEMA_NAME.to_string(),
4496 "http_server_requests_seconds_count".to_string(),
4497 ),
4498 ],
4499 &["ip"],
4500 )
4501 .await;
4502
4503 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4504 .await
4505 .unwrap();
4506 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4507 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4508 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4509 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4510 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4511 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4512 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4513 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4514 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4515
4516 assert_eq!(plan.display_indent_schema().to_string(), expected);
4517 }
4518
4519 #[tokio::test]
4520 async fn test_quantile_expr() {
4521 let mut eval_stmt = EvalStmt {
4522 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4523 start: UNIX_EPOCH,
4524 end: UNIX_EPOCH
4525 .checked_add(Duration::from_secs(100_000))
4526 .unwrap(),
4527 interval: Duration::from_secs(5),
4528 lookback_delta: Duration::from_secs(1),
4529 };
4530 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4531
4532 let prom_expr = parser::parse(case).unwrap();
4533 eval_stmt.expr = prom_expr;
4534 let table_provider = build_test_table_provider_with_fields(
4535 &[
4536 (
4537 DEFAULT_SCHEMA_NAME.to_string(),
4538 "prometheus_tsdb_head_series".to_string(),
4539 ),
4540 (
4541 DEFAULT_SCHEMA_NAME.to_string(),
4542 "http_server_requests_seconds_count".to_string(),
4543 ),
4544 ],
4545 &["ip"],
4546 )
4547 .await;
4548
4549 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4550 .await
4551 .unwrap();
4552 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4553 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4554 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4555 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4556 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4557 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4558 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4559 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4560 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4561
4562 assert_eq!(plan.display_indent_schema().to_string(), expected);
4563 }
4564
4565 #[tokio::test]
4566 async fn test_or_not_exists_table_label() {
4567 let mut eval_stmt = EvalStmt {
4568 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4569 start: UNIX_EPOCH,
4570 end: UNIX_EPOCH
4571 .checked_add(Duration::from_secs(100_000))
4572 .unwrap(),
4573 interval: Duration::from_secs(5),
4574 lookback_delta: Duration::from_secs(1),
4575 };
4576 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4577
4578 let prom_expr = parser::parse(case).unwrap();
4579 eval_stmt.expr = prom_expr;
4580 let table_provider = build_test_table_provider_with_fields(
4581 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4582 &["job"],
4583 )
4584 .await;
4585
4586 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4587 .await
4588 .unwrap();
4589 let expected = "UnionDistinctOn: on col=[[\"job\"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4590 \n SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4591 \n Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4592 \n Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4593 \n Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4594 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4595 \n PromSeriesDivide: tags=[\"job\"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4596 \n Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4597 \n Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4598 \n TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4599 \n SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4600 \n Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4601 \n Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4602 \n Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4603 \n EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]";
4604
4605 assert_eq!(plan.display_indent_schema().to_string(), expected);
4606 }
4607}