1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::prelude::GREPTIME_VALUE;
25use datafusion::common::DFSchemaRef;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::execution::context::SessionState;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::grouping::grouping_udaf;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::DFSchema;
47use datafusion_expr::utils::conjunction;
48use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
49use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
50use datatypes::data_type::ConcreteDataType;
51use itertools::Itertools;
52use promql::extension_plan::{
53 build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
54 RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
55};
56use promql::functions::{
57 quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
58 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
59 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
60};
61use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
62use promql_parser::parser::token::TokenType;
63use promql_parser::parser::{
64 token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
65 Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
66 NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
67 VectorMatchCardinality, VectorSelector,
68};
69use snafu::{ensure, OptionExt, ResultExt};
70use store_api::metric_engine_consts::{
71 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
72};
73use table::table::adapter::DfTableProviderAdapter;
74
75use crate::promql::error::{
76 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
77 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
78 MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
79 NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, SameLabelSetSnafu, TableNameNotFoundSnafu,
80 TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
81 UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
82 ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
83};
84
85const SPECIAL_TIME_FUNCTION: &str = "time";
87const SCALAR_FUNCTION: &str = "scalar";
89const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
91const SPECIAL_VECTOR_FUNCTION: &str = "vector";
93const LE_COLUMN_NAME: &str = "le";
95
96const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
97
98const DEFAULT_FIELD_COLUMN: &str = "value";
100
101const FIELD_COLUMN_MATCHER: &str = "__field__";
103
104const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
106const DB_COLUMN_MATCHER: &str = "__database__";
107
108const MAX_SCATTER_POINTS: i64 = 400;
110
111const INTERVAL_1H: i64 = 60 * 60 * 1000;
113
114#[derive(Default, Debug, Clone)]
115struct PromPlannerContext {
116 start: Millisecond,
118 end: Millisecond,
119 interval: Millisecond,
120 lookback_delta: Millisecond,
121
122 table_name: Option<String>,
124 time_index_column: Option<String>,
125 field_columns: Vec<String>,
126 tag_columns: Vec<String>,
127 field_column_matcher: Option<Vec<Matcher>>,
128 schema_name: Option<String>,
129 range: Option<Millisecond>,
131}
132
133impl PromPlannerContext {
134 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
135 Self {
136 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
137 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
138 interval: stmt.interval.as_millis() as _,
139 lookback_delta: stmt.lookback_delta.as_millis() as _,
140 ..Default::default()
141 }
142 }
143
144 fn reset(&mut self) {
146 self.table_name = None;
147 self.time_index_column = None;
148 self.field_columns = vec![];
149 self.tag_columns = vec![];
150 self.field_column_matcher = None;
151 self.schema_name = None;
152 self.range = None;
153 }
154
155 fn reset_table_name_and_schema(&mut self) {
157 self.table_name = Some(String::new());
158 self.schema_name = None;
159 }
160
161 fn has_le_tag(&self) -> bool {
163 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
164 }
165}
166
167pub struct PromPlanner {
168 table_provider: DfTableSourceProvider,
169 ctx: PromPlannerContext,
170}
171
172pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
174 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
175 matcher.value = unescaped_value;
176 }
177 matcher
178}
179
180impl PromPlanner {
181 pub async fn stmt_to_plan(
182 table_provider: DfTableSourceProvider,
183 stmt: &EvalStmt,
184 session_state: &SessionState,
185 ) -> Result<LogicalPlan> {
186 let mut planner = Self {
187 table_provider,
188 ctx: PromPlannerContext::from_eval_stmt(stmt),
189 };
190
191 planner.prom_expr_to_plan(&stmt.expr, session_state).await
192 }
193
194 #[async_recursion]
195 pub async fn prom_expr_to_plan(
196 &mut self,
197 prom_expr: &PromExpr,
198 session_state: &SessionState,
199 ) -> Result<LogicalPlan> {
200 let res = match prom_expr {
201 PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
202 PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
203 PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
204 PromExpr::Paren(ParenExpr { expr }) => {
205 self.prom_expr_to_plan(expr, session_state).await?
206 }
207 PromExpr::Subquery(expr) => {
208 self.prom_subquery_expr_to_plan(session_state, expr).await?
209 }
210 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
211 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
212 PromExpr::VectorSelector(selector) => {
213 self.prom_vector_selector_to_plan(selector).await?
214 }
215 PromExpr::MatrixSelector(selector) => {
216 self.prom_matrix_selector_to_plan(selector).await?
217 }
218 PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
219 PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
220 };
221
222 Ok(res)
223 }
224
225 async fn prom_subquery_expr_to_plan(
226 &mut self,
227 session_state: &SessionState,
228 subquery_expr: &SubqueryExpr,
229 ) -> Result<LogicalPlan> {
230 let SubqueryExpr {
231 expr, range, step, ..
232 } = subquery_expr;
233
234 let current_interval = self.ctx.interval;
235 if let Some(step) = step {
236 self.ctx.interval = step.as_millis() as _;
237 }
238 let current_start = self.ctx.start;
239 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
240 let input = self.prom_expr_to_plan(expr, session_state).await?;
241 self.ctx.interval = current_interval;
242 self.ctx.start = current_start;
243
244 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
245 let range_ms = range.as_millis() as _;
246 self.ctx.range = Some(range_ms);
247
248 let manipulate = RangeManipulate::new(
249 self.ctx.start,
250 self.ctx.end,
251 self.ctx.interval,
252 range_ms,
253 self.ctx
254 .time_index_column
255 .clone()
256 .expect("time index should be set in `setup_context`"),
257 self.ctx.field_columns.clone(),
258 input,
259 )
260 .context(DataFusionPlanningSnafu)?;
261
262 Ok(LogicalPlan::Extension(Extension {
263 node: Arc::new(manipulate),
264 }))
265 }
266
267 async fn prom_aggr_expr_to_plan(
268 &mut self,
269 session_state: &SessionState,
270 aggr_expr: &AggregateExpr,
271 ) -> Result<LogicalPlan> {
272 let AggregateExpr {
273 op,
274 expr,
275 modifier,
276 param,
277 } = aggr_expr;
278
279 let input = self.prom_expr_to_plan(expr, session_state).await?;
280
281 match (*op).id() {
282 token::T_TOPK | token::T_BOTTOMK => {
283 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
284 }
285 _ => {
286 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
289 let (aggr_exprs, prev_field_exprs) =
291 self.create_aggregate_exprs(*op, param, &input)?;
292
293 let builder = LogicalPlanBuilder::from(input);
295 let builder = if op.id() == token::T_COUNT_VALUES {
296 let label = Self::get_param_value_as_str(*op, param)?;
297 group_exprs.extend(prev_field_exprs.clone());
300 let project_fields = self
301 .create_field_column_exprs()?
302 .into_iter()
303 .chain(self.create_tag_column_exprs()?)
304 .chain(Some(self.create_time_index_column_expr()?))
305 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
306
307 builder
308 .aggregate(group_exprs.clone(), aggr_exprs)
309 .context(DataFusionPlanningSnafu)?
310 .project(project_fields)
311 .context(DataFusionPlanningSnafu)?
312 } else {
313 builder
314 .aggregate(group_exprs.clone(), aggr_exprs)
315 .context(DataFusionPlanningSnafu)?
316 };
317
318 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
319
320 builder
321 .sort(sort_expr)
322 .context(DataFusionPlanningSnafu)?
323 .build()
324 .context(DataFusionPlanningSnafu)
325 }
326 }
327 }
328
329 async fn prom_topk_bottomk_to_plan(
331 &mut self,
332 aggr_expr: &AggregateExpr,
333 input: LogicalPlan,
334 ) -> Result<LogicalPlan> {
335 let AggregateExpr {
336 op,
337 param,
338 modifier,
339 ..
340 } = aggr_expr;
341
342 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
343
344 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
345
346 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
348
349 let rank_columns: Vec<_> = window_exprs
350 .iter()
351 .map(|expr| expr.schema_name().to_string())
352 .collect();
353
354 let filter: DfExpr = rank_columns
357 .iter()
358 .fold(None, |expr, rank| {
359 let predicate = DfExpr::BinaryExpr(BinaryExpr {
360 left: Box::new(col(rank)),
361 op: Operator::LtEq,
362 right: Box::new(val.clone()),
363 });
364
365 match expr {
366 None => Some(predicate),
367 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
368 left: Box::new(expr),
369 op: Operator::Or,
370 right: Box::new(predicate),
371 })),
372 }
373 })
374 .unwrap();
375
376 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
377
378 let mut new_group_exprs = group_exprs.clone();
379 new_group_exprs.extend(rank_columns);
381
382 let group_sort_expr = new_group_exprs
383 .into_iter()
384 .map(|expr| expr.sort(true, false));
385
386 let project_fields = self
387 .create_field_column_exprs()?
388 .into_iter()
389 .chain(self.create_tag_column_exprs()?)
390 .chain(Some(self.create_time_index_column_expr()?));
391
392 LogicalPlanBuilder::from(input)
393 .window(window_exprs)
394 .context(DataFusionPlanningSnafu)?
395 .filter(filter)
396 .context(DataFusionPlanningSnafu)?
397 .sort(group_sort_expr)
398 .context(DataFusionPlanningSnafu)?
399 .project(project_fields)
400 .context(DataFusionPlanningSnafu)?
401 .build()
402 .context(DataFusionPlanningSnafu)
403 }
404
405 async fn prom_unary_expr_to_plan(
406 &mut self,
407 session_state: &SessionState,
408 unary_expr: &UnaryExpr,
409 ) -> Result<LogicalPlan> {
410 let UnaryExpr { expr } = unary_expr;
411 let input = self.prom_expr_to_plan(expr, session_state).await?;
413 self.projection_for_each_field_column(input, |col| {
414 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
415 })
416 }
417
418 async fn prom_binary_expr_to_plan(
419 &mut self,
420 session_state: &SessionState,
421 binary_expr: &PromBinaryExpr,
422 ) -> Result<LogicalPlan> {
423 let PromBinaryExpr {
424 lhs,
425 rhs,
426 op,
427 modifier,
428 } = binary_expr;
429
430 let should_return_bool = if let Some(m) = modifier {
433 m.return_bool
434 } else {
435 false
436 };
437 let is_comparison_op = Self::is_token_a_comparison_op(*op);
438
439 match (
442 Self::try_build_literal_expr(lhs),
443 Self::try_build_literal_expr(rhs),
444 ) {
445 (Some(lhs), Some(rhs)) => {
446 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
447 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
448 self.ctx.reset_table_name_and_schema();
449 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
450 let mut field_expr = field_expr_builder(lhs, rhs)?;
451
452 if is_comparison_op && should_return_bool {
453 field_expr = DfExpr::Cast(Cast {
454 expr: Box::new(field_expr),
455 data_type: ArrowDataType::Float64,
456 });
457 }
458
459 Ok(LogicalPlan::Extension(Extension {
460 node: Arc::new(
461 EmptyMetric::new(
462 self.ctx.start,
463 self.ctx.end,
464 self.ctx.interval,
465 SPECIAL_TIME_FUNCTION.to_string(),
466 DEFAULT_FIELD_COLUMN.to_string(),
467 Some(field_expr),
468 )
469 .context(DataFusionPlanningSnafu)?,
470 ),
471 }))
472 }
473 (Some(mut expr), None) => {
475 let input = self.prom_expr_to_plan(rhs, session_state).await?;
476 if let Some(time_expr) = Self::try_build_special_time_expr(
478 lhs,
479 self.ctx.time_index_column.as_ref().unwrap(),
480 ) {
481 expr = time_expr
482 }
483 let bin_expr_builder = |col: &String| {
484 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
485 let mut binary_expr =
486 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
487
488 if is_comparison_op && should_return_bool {
489 binary_expr = DfExpr::Cast(Cast {
490 expr: Box::new(binary_expr),
491 data_type: ArrowDataType::Float64,
492 });
493 }
494 Ok(binary_expr)
495 };
496 if is_comparison_op && !should_return_bool {
497 self.filter_on_field_column(input, bin_expr_builder)
498 } else {
499 self.projection_for_each_field_column(input, bin_expr_builder)
500 }
501 }
502 (None, Some(mut expr)) => {
504 let input = self.prom_expr_to_plan(lhs, session_state).await?;
505 if let Some(time_expr) = Self::try_build_special_time_expr(
507 rhs,
508 self.ctx.time_index_column.as_ref().unwrap(),
509 ) {
510 expr = time_expr
511 }
512 let bin_expr_builder = |col: &String| {
513 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
514 let mut binary_expr =
515 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
516
517 if is_comparison_op && should_return_bool {
518 binary_expr = DfExpr::Cast(Cast {
519 expr: Box::new(binary_expr),
520 data_type: ArrowDataType::Float64,
521 });
522 }
523 Ok(binary_expr)
524 };
525 if is_comparison_op && !should_return_bool {
526 self.filter_on_field_column(input, bin_expr_builder)
527 } else {
528 self.projection_for_each_field_column(input, bin_expr_builder)
529 }
530 }
531 (None, None) => {
533 let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
534 let left_field_columns = self.ctx.field_columns.clone();
535 let left_time_index_column = self.ctx.time_index_column.clone();
536 let mut left_table_ref = self
537 .table_ref()
538 .unwrap_or_else(|_| TableReference::bare(""));
539 let left_context = self.ctx.clone();
540
541 let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
542 let right_field_columns = self.ctx.field_columns.clone();
543 let right_time_index_column = self.ctx.time_index_column.clone();
544 let mut right_table_ref = self
545 .table_ref()
546 .unwrap_or_else(|_| TableReference::bare(""));
547 let right_context = self.ctx.clone();
548
549 if Self::is_token_a_set_op(*op) {
553 return self.set_op_on_non_field_columns(
554 left_input,
555 right_input,
556 left_context,
557 right_context,
558 *op,
559 modifier,
560 );
561 }
562
563 if left_table_ref == right_table_ref {
565 left_table_ref = TableReference::bare("lhs");
567 right_table_ref = TableReference::bare("rhs");
568 if self.ctx.tag_columns.is_empty() {
574 self.ctx = left_context.clone();
575 self.ctx.table_name = Some("lhs".to_string());
576 } else {
577 self.ctx.table_name = Some("rhs".to_string());
578 }
579 }
580 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
581
582 let join_plan = self.join_on_non_field_columns(
583 left_input,
584 right_input,
585 left_table_ref.clone(),
586 right_table_ref.clone(),
587 left_time_index_column,
588 right_time_index_column,
589 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
592 modifier,
593 )?;
594 let join_plan_schema = join_plan.schema().clone();
595
596 let bin_expr_builder = |_: &String| {
597 let (left_col_name, right_col_name) = field_columns.next().unwrap();
598 let left_col = join_plan_schema
599 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
600 .context(DataFusionPlanningSnafu)?
601 .into();
602 let right_col = join_plan_schema
603 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
604 .context(DataFusionPlanningSnafu)?
605 .into();
606
607 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
608 let mut binary_expr =
609 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
610 if is_comparison_op && should_return_bool {
611 binary_expr = DfExpr::Cast(Cast {
612 expr: Box::new(binary_expr),
613 data_type: ArrowDataType::Float64,
614 });
615 }
616 Ok(binary_expr)
617 };
618 if is_comparison_op && !should_return_bool {
619 self.filter_on_field_column(join_plan, bin_expr_builder)
620 } else {
621 self.projection_for_each_field_column(join_plan, bin_expr_builder)
622 }
623 }
624 }
625 }
626
627 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
628 let NumberLiteral { val } = number_literal;
629 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
630 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
631 self.ctx.reset_table_name_and_schema();
632 let literal_expr = df_prelude::lit(*val);
633
634 let plan = LogicalPlan::Extension(Extension {
635 node: Arc::new(
636 EmptyMetric::new(
637 self.ctx.start,
638 self.ctx.end,
639 self.ctx.interval,
640 SPECIAL_TIME_FUNCTION.to_string(),
641 DEFAULT_FIELD_COLUMN.to_string(),
642 Some(literal_expr),
643 )
644 .context(DataFusionPlanningSnafu)?,
645 ),
646 });
647 Ok(plan)
648 }
649
650 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
651 let StringLiteral { val } = string_literal;
652 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
653 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
654 self.ctx.reset_table_name_and_schema();
655 let literal_expr = df_prelude::lit(val.to_string());
656
657 let plan = LogicalPlan::Extension(Extension {
658 node: Arc::new(
659 EmptyMetric::new(
660 self.ctx.start,
661 self.ctx.end,
662 self.ctx.interval,
663 SPECIAL_TIME_FUNCTION.to_string(),
664 DEFAULT_FIELD_COLUMN.to_string(),
665 Some(literal_expr),
666 )
667 .context(DataFusionPlanningSnafu)?,
668 ),
669 });
670 Ok(plan)
671 }
672
673 async fn prom_vector_selector_to_plan(
674 &mut self,
675 vector_selector: &VectorSelector,
676 ) -> Result<LogicalPlan> {
677 let VectorSelector {
678 name,
679 offset,
680 matchers,
681 at: _,
682 } = vector_selector;
683 let matchers = self.preprocess_label_matchers(matchers, name)?;
684 if let Some(empty_plan) = self.setup_context().await? {
685 return Ok(empty_plan);
686 }
687 let normalize = self
688 .selector_to_series_normalize_plan(offset, matchers, false)
689 .await?;
690 let manipulate = InstantManipulate::new(
691 self.ctx.start,
692 self.ctx.end,
693 self.ctx.lookback_delta,
694 self.ctx.interval,
695 self.ctx
696 .time_index_column
697 .clone()
698 .expect("time index should be set in `setup_context`"),
699 self.ctx.field_columns.first().cloned(),
700 normalize,
701 );
702 Ok(LogicalPlan::Extension(Extension {
703 node: Arc::new(manipulate),
704 }))
705 }
706
707 async fn prom_matrix_selector_to_plan(
708 &mut self,
709 matrix_selector: &MatrixSelector,
710 ) -> Result<LogicalPlan> {
711 let MatrixSelector { vs, range } = matrix_selector;
712 let VectorSelector {
713 name,
714 offset,
715 matchers,
716 ..
717 } = vs;
718 let matchers = self.preprocess_label_matchers(matchers, name)?;
719 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
720 let range_ms = range.as_millis() as _;
721 self.ctx.range = Some(range_ms);
722
723 let normalize = match self.setup_context().await? {
726 Some(empty_plan) => empty_plan,
727 None => {
728 self.selector_to_series_normalize_plan(offset, matchers, true)
729 .await?
730 }
731 };
732 let manipulate = RangeManipulate::new(
733 self.ctx.start,
734 self.ctx.end,
735 self.ctx.interval,
736 range_ms,
738 self.ctx
739 .time_index_column
740 .clone()
741 .expect("time index should be set in `setup_context`"),
742 self.ctx.field_columns.clone(),
743 normalize,
744 )
745 .context(DataFusionPlanningSnafu)?;
746
747 Ok(LogicalPlan::Extension(Extension {
748 node: Arc::new(manipulate),
749 }))
750 }
751
752 async fn prom_call_expr_to_plan(
753 &mut self,
754 session_state: &SessionState,
755 call_expr: &Call,
756 ) -> Result<LogicalPlan> {
757 let Call { func, args } = call_expr;
758 match func.name {
760 SPECIAL_HISTOGRAM_QUANTILE => {
761 return self.create_histogram_plan(args, session_state).await
762 }
763 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
764 SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
765 _ => {}
766 }
767
768 let args = self.create_function_args(&args.args)?;
770 let input = if let Some(prom_expr) = &args.input {
771 self.prom_expr_to_plan(prom_expr, session_state).await?
772 } else {
773 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
774 self.ctx.reset_table_name_and_schema();
775 self.ctx.tag_columns = vec![];
776 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
777 LogicalPlan::Extension(Extension {
778 node: Arc::new(
779 EmptyMetric::new(
780 self.ctx.start,
781 self.ctx.end,
782 self.ctx.interval,
783 SPECIAL_TIME_FUNCTION.to_string(),
784 DEFAULT_FIELD_COLUMN.to_string(),
785 None,
786 )
787 .context(DataFusionPlanningSnafu)?,
788 ),
789 })
790 };
791 let (mut func_exprs, new_tags) =
792 self.create_function_expr(func, args.literals.clone(), session_state)?;
793 func_exprs.insert(0, self.create_time_index_column_expr()?);
794 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
795
796 let builder = LogicalPlanBuilder::from(input)
797 .project(func_exprs)
798 .context(DataFusionPlanningSnafu)?
799 .filter(self.create_empty_values_filter_expr()?)
800 .context(DataFusionPlanningSnafu)?;
801
802 let builder = match func.name {
803 "sort" => builder
804 .sort(self.create_field_columns_sort_exprs(true))
805 .context(DataFusionPlanningSnafu)?,
806 "sort_desc" => builder
807 .sort(self.create_field_columns_sort_exprs(false))
808 .context(DataFusionPlanningSnafu)?,
809 "sort_by_label" => builder
810 .sort(Self::create_sort_exprs_by_tags(
811 func.name,
812 args.literals,
813 true,
814 )?)
815 .context(DataFusionPlanningSnafu)?,
816 "sort_by_label_desc" => builder
817 .sort(Self::create_sort_exprs_by_tags(
818 func.name,
819 args.literals,
820 false,
821 )?)
822 .context(DataFusionPlanningSnafu)?,
823
824 _ => builder,
825 };
826
827 for tag in new_tags {
830 self.ctx.tag_columns.push(tag);
831 }
832
833 builder.build().context(DataFusionPlanningSnafu)
834 }
835
836 async fn prom_ext_expr_to_plan(
837 &mut self,
838 session_state: &SessionState,
839 ext_expr: &promql_parser::parser::ast::Extension,
840 ) -> Result<LogicalPlan> {
841 let expr = &ext_expr.expr;
843 let children = expr.children();
844 let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
845 match expr.name() {
851 "ANALYZE" => LogicalPlanBuilder::from(plan)
852 .explain(false, true)
853 .unwrap()
854 .build()
855 .context(DataFusionPlanningSnafu),
856 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
857 .explain(true, true)
858 .unwrap()
859 .build()
860 .context(DataFusionPlanningSnafu),
861 "EXPLAIN" => LogicalPlanBuilder::from(plan)
862 .explain(false, false)
863 .unwrap()
864 .build()
865 .context(DataFusionPlanningSnafu),
866 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
867 .explain(true, false)
868 .unwrap()
869 .build()
870 .context(DataFusionPlanningSnafu),
871 _ => LogicalPlanBuilder::empty(true)
872 .build()
873 .context(DataFusionPlanningSnafu),
874 }
875 }
876
877 #[allow(clippy::mutable_key_type)]
887 fn preprocess_label_matchers(
888 &mut self,
889 label_matchers: &Matchers,
890 name: &Option<String>,
891 ) -> Result<Matchers> {
892 self.ctx.reset();
893
894 let metric_name;
895 if let Some(name) = name.clone() {
896 metric_name = Some(name);
897 ensure!(
898 label_matchers.find_matchers(METRIC_NAME).is_empty(),
899 MultipleMetricMatchersSnafu
900 );
901 } else {
902 let mut matches = label_matchers.find_matchers(METRIC_NAME);
903 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
904 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
905 ensure!(
906 matches[0].op == MatchOp::Equal,
907 UnsupportedMatcherOpSnafu {
908 matcher_op: matches[0].op.to_string(),
909 matcher: METRIC_NAME
910 }
911 );
912 metric_name = matches.pop().map(|m| m.value);
913 }
914
915 self.ctx.table_name = metric_name;
916
917 let mut matchers = HashSet::new();
918 for matcher in &label_matchers.matchers {
919 if matcher.name == FIELD_COLUMN_MATCHER {
921 self.ctx
922 .field_column_matcher
923 .get_or_insert_default()
924 .push(matcher.clone());
925 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
926 ensure!(
927 matcher.op == MatchOp::Equal,
928 UnsupportedMatcherOpSnafu {
929 matcher: matcher.name.to_string(),
930 matcher_op: matcher.op.to_string(),
931 }
932 );
933 self.ctx.schema_name = Some(matcher.value.clone());
934 } else if matcher.name != METRIC_NAME {
935 let _ = matchers.insert(matcher.clone());
936 }
937 }
938
939 Ok(Matchers::new(
940 matchers.into_iter().map(normalize_matcher).collect(),
941 ))
942 }
943
944 async fn selector_to_series_normalize_plan(
945 &mut self,
946 offset: &Option<Offset>,
947 label_matchers: Matchers,
948 is_range_selector: bool,
949 ) -> Result<LogicalPlan> {
950 let table_ref = self.table_ref()?;
952 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
953 let table_schema = table_scan.schema();
954
955 let offset_duration = match offset {
957 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
958 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
959 None => 0,
960 };
961 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
962 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
963 scan_filters.push(time_index_filter);
964 }
965 table_scan = LogicalPlanBuilder::from(table_scan)
966 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
968 .build()
969 .context(DataFusionPlanningSnafu)?;
970
971 if let Some(field_matchers) = &self.ctx.field_column_matcher {
973 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
974 let mut result_set = HashSet::new();
976 let mut reverse_set = HashSet::new();
978 for matcher in field_matchers {
979 match &matcher.op {
980 MatchOp::Equal => {
981 if col_set.contains(&matcher.value) {
982 let _ = result_set.insert(matcher.value.clone());
983 } else {
984 return Err(ColumnNotFoundSnafu {
985 col: matcher.value.clone(),
986 }
987 .build());
988 }
989 }
990 MatchOp::NotEqual => {
991 if col_set.contains(&matcher.value) {
992 let _ = reverse_set.insert(matcher.value.clone());
993 } else {
994 return Err(ColumnNotFoundSnafu {
995 col: matcher.value.clone(),
996 }
997 .build());
998 }
999 }
1000 MatchOp::Re(regex) => {
1001 for col in &self.ctx.field_columns {
1002 if regex.is_match(col) {
1003 let _ = result_set.insert(col.clone());
1004 }
1005 }
1006 }
1007 MatchOp::NotRe(regex) => {
1008 for col in &self.ctx.field_columns {
1009 if regex.is_match(col) {
1010 let _ = reverse_set.insert(col.clone());
1011 }
1012 }
1013 }
1014 }
1015 }
1016 if result_set.is_empty() {
1018 result_set = col_set.into_iter().cloned().collect();
1019 }
1020 for col in reverse_set {
1021 let _ = result_set.remove(&col);
1022 }
1023
1024 self.ctx.field_columns = self
1026 .ctx
1027 .field_columns
1028 .drain(..)
1029 .filter(|col| result_set.contains(col))
1030 .collect();
1031
1032 let exprs = result_set
1033 .into_iter()
1034 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1035 .chain(self.create_tag_column_exprs()?)
1036 .chain(Some(self.create_time_index_column_expr()?))
1037 .collect::<Vec<_>>();
1038
1039 table_scan = LogicalPlanBuilder::from(table_scan)
1041 .project(exprs)
1042 .context(DataFusionPlanningSnafu)?
1043 .build()
1044 .context(DataFusionPlanningSnafu)?;
1045 }
1046
1047 let sort_plan = LogicalPlanBuilder::from(table_scan)
1049 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1050 .context(DataFusionPlanningSnafu)?
1051 .build()
1052 .context(DataFusionPlanningSnafu)?;
1053
1054 let time_index_column =
1056 self.ctx
1057 .time_index_column
1058 .clone()
1059 .with_context(|| TimeIndexNotFoundSnafu {
1060 table: table_ref.to_string(),
1061 })?;
1062 let divide_plan = LogicalPlan::Extension(Extension {
1063 node: Arc::new(SeriesDivide::new(
1064 self.ctx.tag_columns.clone(),
1065 time_index_column,
1066 sort_plan,
1067 )),
1068 });
1069
1070 if !is_range_selector && offset_duration == 0 {
1072 return Ok(divide_plan);
1073 }
1074 let series_normalize = SeriesNormalize::new(
1075 offset_duration,
1076 self.ctx
1077 .time_index_column
1078 .clone()
1079 .with_context(|| TimeIndexNotFoundSnafu {
1080 table: table_ref.to_quoted_string(),
1081 })?,
1082 is_range_selector,
1083 self.ctx.tag_columns.clone(),
1084 divide_plan,
1085 );
1086 let logical_plan = LogicalPlan::Extension(Extension {
1087 node: Arc::new(series_normalize),
1088 });
1089
1090 Ok(logical_plan)
1091 }
1092
1093 fn agg_modifier_to_col(
1100 &mut self,
1101 input_schema: &DFSchemaRef,
1102 modifier: &Option<LabelModifier>,
1103 update_ctx: bool,
1104 ) -> Result<Vec<DfExpr>> {
1105 match modifier {
1106 None => {
1107 if update_ctx {
1108 self.ctx.tag_columns.clear();
1109 }
1110 Ok(vec![self.create_time_index_column_expr()?])
1111 }
1112 Some(LabelModifier::Include(labels)) => {
1113 if update_ctx {
1114 self.ctx.tag_columns.clear();
1115 }
1116 let mut exprs = Vec::with_capacity(labels.labels.len());
1117 for label in &labels.labels {
1118 if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1120 exprs.push(DfExpr::Column(Column::from(field.name())));
1121
1122 if update_ctx {
1123 self.ctx.tag_columns.push(label.clone());
1125 }
1126 }
1127 }
1128 exprs.push(self.create_time_index_column_expr()?);
1130
1131 Ok(exprs)
1132 }
1133 Some(LabelModifier::Exclude(labels)) => {
1134 let mut all_fields = input_schema
1135 .fields()
1136 .iter()
1137 .map(|f| f.name())
1138 .collect::<BTreeSet<_>>();
1139
1140 for label in &labels.labels {
1143 let _ = all_fields.remove(label);
1144 }
1145
1146 if let Some(time_index) = &self.ctx.time_index_column {
1148 let _ = all_fields.remove(time_index);
1149 }
1150 for value in &self.ctx.field_columns {
1151 let _ = all_fields.remove(value);
1152 }
1153
1154 if update_ctx {
1155 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1157 }
1158
1159 let mut exprs = all_fields
1161 .into_iter()
1162 .map(|c| DfExpr::Column(Column::from(c)))
1163 .collect::<Vec<_>>();
1164
1165 exprs.push(self.create_time_index_column_expr()?);
1167
1168 Ok(exprs)
1169 }
1170 }
1171 }
1172
1173 pub fn matchers_to_expr(
1175 label_matchers: Matchers,
1176 table_schema: &DFSchemaRef,
1177 ) -> Result<Vec<DfExpr>> {
1178 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1179 for matcher in label_matchers.matchers {
1180 let col = if table_schema
1181 .field_with_unqualified_name(&matcher.name)
1182 .is_err()
1183 {
1184 DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1185 } else {
1186 DfExpr::Column(Column::from_name(matcher.name))
1187 };
1188 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1189 let expr = match matcher.op {
1190 MatchOp::Equal => col.eq(lit),
1191 MatchOp::NotEqual => col.not_eq(lit),
1192 MatchOp::Re(re) => {
1193 if re.as_str() == ".*" {
1195 continue;
1196 }
1197 DfExpr::BinaryExpr(BinaryExpr {
1198 left: Box::new(col),
1199 op: Operator::RegexMatch,
1200 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1201 re.as_str().to_string(),
1202 )))),
1203 })
1204 }
1205 MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1206 left: Box::new(col),
1207 op: Operator::RegexNotMatch,
1208 right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1209 re.as_str().to_string(),
1210 )))),
1211 }),
1212 };
1213 exprs.push(expr);
1214 }
1215
1216 Ok(exprs)
1217 }
1218
1219 fn table_ref(&self) -> Result<TableReference> {
1220 let table_name = self
1221 .ctx
1222 .table_name
1223 .clone()
1224 .context(TableNameNotFoundSnafu)?;
1225
1226 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1228 TableReference::partial(schema_name.as_str(), table_name.as_str())
1229 } else {
1230 TableReference::bare(table_name.as_str())
1231 };
1232
1233 Ok(table_ref)
1234 }
1235
1236 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1237 let start = self.ctx.start;
1238 let end = self.ctx.end;
1239 if end < start {
1240 return InvalidTimeRangeSnafu { start, end }.fail();
1241 }
1242 let lookback_delta = self.ctx.lookback_delta;
1243 let range = self.ctx.range.unwrap_or_default();
1244 let interval = self.ctx.interval;
1245 let time_index_expr = self.create_time_index_column_expr()?;
1246 let num_points = (end - start) / interval;
1247
1248 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1250 let single_time_range = time_index_expr
1251 .clone()
1252 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1253 Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1254 None,
1255 )))
1256 .and(
1257 time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1258 Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1259 None,
1260 ))),
1261 );
1262 return Ok(Some(single_time_range));
1263 }
1264
1265 let mut filters = Vec::with_capacity(num_points as usize);
1267 for timestamp in (start..end).step_by(interval as usize) {
1268 filters.push(
1269 time_index_expr
1270 .clone()
1271 .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1272 Some(timestamp - offset_duration - lookback_delta - range),
1273 None,
1274 )))
1275 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1276 ScalarValue::TimestampMillisecond(
1277 Some(timestamp - offset_duration + lookback_delta),
1278 None,
1279 ),
1280 ))),
1281 )
1282 }
1283
1284 Ok(filters.into_iter().reduce(DfExpr::or))
1285 }
1286
1287 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1292 let provider = self
1293 .table_provider
1294 .resolve_table(table_ref.clone())
1295 .await
1296 .context(CatalogSnafu)?;
1297
1298 let is_time_index_ms = provider
1299 .as_any()
1300 .downcast_ref::<DefaultTableSource>()
1301 .context(UnknownTableSnafu)?
1302 .table_provider
1303 .as_any()
1304 .downcast_ref::<DfTableProviderAdapter>()
1305 .context(UnknownTableSnafu)?
1306 .table()
1307 .schema()
1308 .timestamp_column()
1309 .with_context(|| TimeIndexNotFoundSnafu {
1310 table: table_ref.to_quoted_string(),
1311 })?
1312 .data_type
1313 == ConcreteDataType::timestamp_millisecond_datatype();
1314
1315 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1316 .context(DataFusionPlanningSnafu)?
1317 .build()
1318 .context(DataFusionPlanningSnafu)?;
1319
1320 if !is_time_index_ms {
1321 let expr: Vec<_> = self
1323 .ctx
1324 .field_columns
1325 .iter()
1326 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1327 .chain(self.create_tag_column_exprs()?)
1328 .chain(Some(DfExpr::Alias(Alias {
1329 expr: Box::new(DfExpr::Cast(Cast {
1330 expr: Box::new(self.create_time_index_column_expr()?),
1331 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1332 })),
1333 relation: Some(table_ref.clone()),
1334 name: self
1335 .ctx
1336 .time_index_column
1337 .as_ref()
1338 .with_context(|| TimeIndexNotFoundSnafu {
1339 table: table_ref.to_quoted_string(),
1340 })?
1341 .clone(),
1342 })))
1343 .collect::<Vec<_>>();
1344 scan_plan = LogicalPlanBuilder::from(scan_plan)
1345 .project(expr)
1346 .context(DataFusionPlanningSnafu)?
1347 .build()
1348 .context(DataFusionPlanningSnafu)?;
1349 }
1350
1351 let result = LogicalPlanBuilder::from(scan_plan)
1352 .build()
1353 .context(DataFusionPlanningSnafu)?;
1354 Ok(result)
1355 }
1356
1357 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1361 let table_ref = self.table_ref()?;
1362 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1363 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1364 let plan = self.setup_context_for_empty_metric()?;
1365 return Ok(Some(plan));
1366 }
1367 res => res.context(CatalogSnafu)?,
1368 };
1369 let table = table
1370 .as_any()
1371 .downcast_ref::<DefaultTableSource>()
1372 .context(UnknownTableSnafu)?
1373 .table_provider
1374 .as_any()
1375 .downcast_ref::<DfTableProviderAdapter>()
1376 .context(UnknownTableSnafu)?
1377 .table();
1378
1379 let time_index = table
1381 .schema()
1382 .timestamp_column()
1383 .with_context(|| TimeIndexNotFoundSnafu {
1384 table: table_ref.to_quoted_string(),
1385 })?
1386 .name
1387 .clone();
1388 self.ctx.time_index_column = Some(time_index);
1389
1390 let values = table
1392 .table_info()
1393 .meta
1394 .field_column_names()
1395 .cloned()
1396 .collect();
1397 self.ctx.field_columns = values;
1398
1399 let tags = table
1401 .table_info()
1402 .meta
1403 .row_key_column_names()
1404 .filter(|col| {
1405 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1407 })
1408 .cloned()
1409 .collect();
1410 self.ctx.tag_columns = tags;
1411
1412 Ok(None)
1413 }
1414
1415 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1418 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1419 self.ctx.reset_table_name_and_schema();
1420 self.ctx.tag_columns = vec![];
1421 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1422
1423 let plan = LogicalPlan::Extension(Extension {
1425 node: Arc::new(
1426 EmptyMetric::new(
1427 0,
1428 -1,
1429 self.ctx.interval,
1430 SPECIAL_TIME_FUNCTION.to_string(),
1431 DEFAULT_FIELD_COLUMN.to_string(),
1432 Some(DfExpr::Literal(ScalarValue::Float64(Some(0.0)))),
1433 )
1434 .context(DataFusionPlanningSnafu)?,
1435 ),
1436 });
1437 Ok(plan)
1438 }
1439
1440 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1442 let mut result = FunctionArgs::default();
1443
1444 for arg in args {
1445 match *arg.clone() {
1446 PromExpr::Subquery(_)
1447 | PromExpr::VectorSelector(_)
1448 | PromExpr::MatrixSelector(_)
1449 | PromExpr::Extension(_)
1450 | PromExpr::Aggregate(_)
1451 | PromExpr::Paren(_)
1452 | PromExpr::Call(_) => {
1453 if result.input.replace(*arg.clone()).is_some() {
1454 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1455 }
1456 }
1457
1458 _ => {
1459 let expr =
1460 Self::get_param_as_literal_expr(&Some(Box::new(*arg.clone())), None, None)?;
1461 result.literals.push(expr);
1462 }
1463 }
1464 }
1465
1466 Ok(result)
1467 }
1468
1469 fn create_function_expr(
1475 &mut self,
1476 func: &Function,
1477 other_input_exprs: Vec<DfExpr>,
1478 session_state: &SessionState,
1479 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1480 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1482
1483 let field_column_pos = 0;
1485 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1486 let mut new_tags = vec![];
1488 let scalar_func = match func.name {
1489 "increase" => ScalarFunc::ExtrapolateUdf(
1490 Arc::new(Increase::scalar_udf()),
1491 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1492 ),
1493 "rate" => ScalarFunc::ExtrapolateUdf(
1494 Arc::new(Rate::scalar_udf()),
1495 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1496 ),
1497 "delta" => ScalarFunc::ExtrapolateUdf(
1498 Arc::new(Delta::scalar_udf()),
1499 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1500 ),
1501 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1502 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1503 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1504 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1505 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1506 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1507 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1508 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1509 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1510 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1511 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1512 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1513 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1514 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1515 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1516 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1517 "predict_linear" => {
1518 other_input_exprs[0] = DfExpr::Cast(Cast {
1519 expr: Box::new(other_input_exprs[0].clone()),
1520 data_type: ArrowDataType::Int64,
1521 });
1522 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1523 }
1524 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1525 "time" => {
1526 exprs.push(build_special_time_expr(
1527 self.ctx.time_index_column.as_ref().unwrap(),
1528 ));
1529 ScalarFunc::GeneratedExpr
1530 }
1531 "minute" => {
1532 let expr = self.date_part_on_time_index("minute")?;
1534 exprs.push(expr);
1535 ScalarFunc::GeneratedExpr
1536 }
1537 "hour" => {
1538 let expr = self.date_part_on_time_index("hour")?;
1540 exprs.push(expr);
1541 ScalarFunc::GeneratedExpr
1542 }
1543 "month" => {
1544 let expr = self.date_part_on_time_index("month")?;
1546 exprs.push(expr);
1547 ScalarFunc::GeneratedExpr
1548 }
1549 "year" => {
1550 let expr = self.date_part_on_time_index("year")?;
1552 exprs.push(expr);
1553 ScalarFunc::GeneratedExpr
1554 }
1555 "day_of_month" => {
1556 let expr = self.date_part_on_time_index("day")?;
1558 exprs.push(expr);
1559 ScalarFunc::GeneratedExpr
1560 }
1561 "day_of_week" => {
1562 let expr = self.date_part_on_time_index("dow")?;
1564 exprs.push(expr);
1565 ScalarFunc::GeneratedExpr
1566 }
1567 "day_of_year" => {
1568 let expr = self.date_part_on_time_index("doy")?;
1570 exprs.push(expr);
1571 ScalarFunc::GeneratedExpr
1572 }
1573 "days_in_month" => {
1574 let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1579 let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1580 let interval_1month_lit_expr =
1581 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1582 let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1583 IntervalDayTime::new(1, 0),
1584 )));
1585 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1586 left: Box::new(interval_1month_lit_expr),
1587 op: Operator::Minus,
1588 right: Box::new(interval_1day_lit_expr),
1589 });
1590 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1591 func: datafusion_functions::datetime::date_trunc(),
1592 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1593 });
1594 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1595 left: Box::new(date_trunc_expr),
1596 op: Operator::Plus,
1597 right: Box::new(the_1month_minus_1day_expr),
1598 });
1599 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1600 func: datafusion_functions::datetime::date_part(),
1601 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1602 });
1603
1604 exprs.push(date_part_expr);
1605 ScalarFunc::GeneratedExpr
1606 }
1607
1608 "label_join" => {
1609 let (concat_expr, dst_label) =
1610 Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1611
1612 for value in &self.ctx.field_columns {
1614 if *value != dst_label {
1615 let expr = DfExpr::Column(Column::from_name(value));
1616 exprs.push(expr);
1617 }
1618 }
1619
1620 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1622 new_tags.push(dst_label);
1623 exprs.push(concat_expr);
1625
1626 ScalarFunc::GeneratedExpr
1627 }
1628 "label_replace" => {
1629 if let Some((replace_expr, dst_label)) =
1630 self.build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?
1631 {
1632 for value in &self.ctx.field_columns {
1634 if *value != dst_label {
1635 let expr = DfExpr::Column(Column::from_name(value));
1636 exprs.push(expr);
1637 }
1638 }
1639
1640 ensure!(
1641 !self.ctx.tag_columns.contains(&dst_label),
1642 SameLabelSetSnafu
1643 );
1644 new_tags.push(dst_label);
1645 exprs.push(replace_expr);
1647 } else {
1648 for value in &self.ctx.field_columns {
1650 let expr = DfExpr::Column(Column::from_name(value));
1651 exprs.push(expr);
1652 }
1653 }
1654
1655 ScalarFunc::GeneratedExpr
1656 }
1657 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1658 for value in &self.ctx.field_columns {
1661 let expr = DfExpr::Column(Column::from_name(value));
1662 exprs.push(expr);
1663 }
1664
1665 ScalarFunc::GeneratedExpr
1666 }
1667 "round" => {
1668 if other_input_exprs.is_empty() {
1669 other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1670 }
1671 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1672 }
1673
1674 _ => {
1675 if let Some(f) = session_state.scalar_functions().get(func.name) {
1676 ScalarFunc::DataFusionBuiltin(f.clone())
1677 } else if let Some(f) = datafusion_functions::math::functions()
1678 .iter()
1679 .find(|f| f.name() == func.name)
1680 {
1681 ScalarFunc::DataFusionUdf(f.clone())
1682 } else {
1683 return UnsupportedExprSnafu {
1684 name: func.name.to_string(),
1685 }
1686 .fail();
1687 }
1688 }
1689 };
1690
1691 for value in &self.ctx.field_columns {
1692 let col_expr = DfExpr::Column(Column::from_name(value));
1693
1694 match scalar_func.clone() {
1695 ScalarFunc::DataFusionBuiltin(func) => {
1696 other_input_exprs.insert(field_column_pos, col_expr);
1697 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1698 func,
1699 args: other_input_exprs.clone().into(),
1700 });
1701 exprs.push(fn_expr);
1702 let _ = other_input_exprs.remove(field_column_pos);
1703 }
1704 ScalarFunc::DataFusionUdf(func) => {
1705 let args = itertools::chain!(
1706 other_input_exprs.iter().take(field_column_pos).cloned(),
1707 std::iter::once(col_expr),
1708 other_input_exprs.iter().skip(field_column_pos).cloned()
1709 )
1710 .collect_vec();
1711 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1712 }
1713 ScalarFunc::Udf(func) => {
1714 let ts_range_expr = DfExpr::Column(Column::from_name(
1715 RangeManipulate::build_timestamp_range_name(
1716 self.ctx.time_index_column.as_ref().unwrap(),
1717 ),
1718 ));
1719 other_input_exprs.insert(field_column_pos, ts_range_expr);
1720 other_input_exprs.insert(field_column_pos + 1, col_expr);
1721 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1722 func,
1723 args: other_input_exprs.clone().into(),
1724 });
1725 exprs.push(fn_expr);
1726 let _ = other_input_exprs.remove(field_column_pos + 1);
1727 let _ = other_input_exprs.remove(field_column_pos);
1728 }
1729 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1730 let ts_range_expr = DfExpr::Column(Column::from_name(
1731 RangeManipulate::build_timestamp_range_name(
1732 self.ctx.time_index_column.as_ref().unwrap(),
1733 ),
1734 ));
1735 other_input_exprs.insert(field_column_pos, ts_range_expr);
1736 other_input_exprs.insert(field_column_pos + 1, col_expr);
1737 other_input_exprs
1738 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1739 other_input_exprs.push_back(lit(range_length));
1740 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1741 func,
1742 args: other_input_exprs.clone().into(),
1743 });
1744 exprs.push(fn_expr);
1745 let _ = other_input_exprs.pop_back();
1746 let _ = other_input_exprs.remove(field_column_pos + 2);
1747 let _ = other_input_exprs.remove(field_column_pos + 1);
1748 let _ = other_input_exprs.remove(field_column_pos);
1749 }
1750 ScalarFunc::GeneratedExpr => {}
1751 }
1752 }
1753
1754 if !matches!(func.name, "label_join" | "label_replace") {
1758 let mut new_field_columns = Vec::with_capacity(exprs.len());
1759
1760 exprs = exprs
1761 .into_iter()
1762 .map(|expr| {
1763 let display_name = expr.schema_name().to_string();
1764 new_field_columns.push(display_name.clone());
1765 Ok(expr.alias(display_name))
1766 })
1767 .collect::<std::result::Result<Vec<_>, _>>()
1768 .context(DataFusionPlanningSnafu)?;
1769
1770 self.ctx.field_columns = new_field_columns;
1771 }
1772
1773 Ok((exprs, new_tags))
1774 }
1775
1776 fn build_regexp_replace_label_expr(
1778 &self,
1779 other_input_exprs: &mut VecDeque<DfExpr>,
1780 session_state: &SessionState,
1781 ) -> Result<Option<(DfExpr, String)>> {
1782 let dst_label = match other_input_exprs.pop_front() {
1784 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1785 other => UnexpectedPlanExprSnafu {
1786 desc: format!("expected dst_label string literal, but found {:?}", other),
1787 }
1788 .fail()?,
1789 };
1790 let replacement = match other_input_exprs.pop_front() {
1791 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1792 other => UnexpectedPlanExprSnafu {
1793 desc: format!("expected replacement string literal, but found {:?}", other),
1794 }
1795 .fail()?,
1796 };
1797 let src_label = match other_input_exprs.pop_front() {
1798 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1799 other => UnexpectedPlanExprSnafu {
1800 desc: format!("expected src_label string literal, but found {:?}", other),
1801 }
1802 .fail()?,
1803 };
1804
1805 let regex = match other_input_exprs.pop_front() {
1806 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1807 other => UnexpectedPlanExprSnafu {
1808 desc: format!("expected regex string literal, but found {:?}", other),
1809 }
1810 .fail()?,
1811 };
1812
1813 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
1815 return Ok(None);
1816 }
1817
1818 if !self.ctx.tag_columns.contains(&src_label) {
1820 if replacement.is_empty() {
1821 return Ok(None);
1823 } else {
1824 return Ok(Some((
1826 DfExpr::Literal(ScalarValue::Utf8(Some(replacement))).alias(&dst_label),
1828 dst_label,
1829 )));
1830 }
1831 }
1832
1833 let regex = format!("^(?s:{regex})$");
1836
1837 let func = session_state
1838 .scalar_functions()
1839 .get("regexp_replace")
1840 .context(UnsupportedExprSnafu {
1841 name: "regexp_replace",
1842 })?;
1843
1844 let args = vec![
1846 if src_label.is_empty() {
1847 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1848 } else {
1849 DfExpr::Column(Column::from_name(src_label))
1850 },
1851 DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1852 DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1853 ];
1854
1855 Ok(Some((
1856 DfExpr::ScalarFunction(ScalarFunction {
1857 func: func.clone(),
1858 args,
1859 })
1860 .alias(&dst_label),
1861 dst_label,
1862 )))
1863 }
1864
1865 fn build_concat_labels_expr(
1867 other_input_exprs: &mut VecDeque<DfExpr>,
1868 session_state: &SessionState,
1869 ) -> Result<(DfExpr, String)> {
1870 let dst_label = match other_input_exprs.pop_front() {
1873 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1874 other => UnexpectedPlanExprSnafu {
1875 desc: format!("expected dst_label string literal, but found {:?}", other),
1876 }
1877 .fail()?,
1878 };
1879 let separator = match other_input_exprs.pop_front() {
1880 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1881 other => UnexpectedPlanExprSnafu {
1882 desc: format!("expected separator string literal, but found {:?}", other),
1883 }
1884 .fail()?,
1885 };
1886 let src_labels = other_input_exprs
1887 .clone()
1888 .into_iter()
1889 .map(|expr| {
1890 match expr {
1892 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1893 if label.is_empty() {
1894 Ok(DfExpr::Literal(ScalarValue::Null))
1895 } else {
1896 Ok(DfExpr::Column(Column::from_name(label)))
1897 }
1898 }
1899 other => UnexpectedPlanExprSnafu {
1900 desc: format!(
1901 "expected source label string literal, but found {:?}",
1902 other
1903 ),
1904 }
1905 .fail(),
1906 }
1907 })
1908 .collect::<Result<Vec<_>>>()?;
1909 ensure!(
1910 !src_labels.is_empty(),
1911 FunctionInvalidArgumentSnafu {
1912 fn_name: "label_join"
1913 }
1914 );
1915
1916 let func = session_state
1917 .scalar_functions()
1918 .get("concat_ws")
1919 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1920
1921 let mut args = Vec::with_capacity(1 + src_labels.len());
1923 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1924 args.extend(src_labels);
1925
1926 Ok((
1927 DfExpr::ScalarFunction(ScalarFunction {
1928 func: func.clone(),
1929 args,
1930 })
1931 .alias(&dst_label),
1932 dst_label,
1933 ))
1934 }
1935
1936 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1937 Ok(DfExpr::Column(Column::from_name(
1938 self.ctx
1939 .time_index_column
1940 .clone()
1941 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1942 )))
1943 }
1944
1945 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1946 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1947 for tag in &self.ctx.tag_columns {
1948 let expr = DfExpr::Column(Column::from_name(tag));
1949 result.push(expr);
1950 }
1951 Ok(result)
1952 }
1953
1954 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1955 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1956 for field in &self.ctx.field_columns {
1957 let expr = DfExpr::Column(Column::from_name(field));
1958 result.push(expr);
1959 }
1960 Ok(result)
1961 }
1962
1963 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1964 let mut result = self
1965 .ctx
1966 .tag_columns
1967 .iter()
1968 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1969 .collect::<Vec<_>>();
1970 result.push(self.create_time_index_column_expr()?.sort(true, true));
1971 Ok(result)
1972 }
1973
1974 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1975 self.ctx
1976 .field_columns
1977 .iter()
1978 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1979 .collect::<Vec<_>>()
1980 }
1981
1982 fn create_sort_exprs_by_tags(
1983 func: &str,
1984 tags: Vec<DfExpr>,
1985 asc: bool,
1986 ) -> Result<Vec<SortExpr>> {
1987 ensure!(
1988 !tags.is_empty(),
1989 FunctionInvalidArgumentSnafu { fn_name: func }
1990 );
1991
1992 tags.iter()
1993 .map(|col| match col {
1994 DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1995 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1996 }
1997 other => UnexpectedPlanExprSnafu {
1998 desc: format!("expected label string literal, but found {:?}", other),
1999 }
2000 .fail(),
2001 })
2002 .collect::<Result<Vec<_>>>()
2003 }
2004
2005 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2006 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2007 for value in &self.ctx.field_columns {
2008 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2009 exprs.push(expr);
2010 }
2011
2012 conjunction(exprs).context(ValueNotFoundSnafu {
2013 table: self.table_ref()?.to_quoted_string(),
2014 })
2015 }
2016
2017 fn create_aggregate_exprs(
2033 &mut self,
2034 op: TokenType,
2035 param: &Option<Box<PromExpr>>,
2036 input_plan: &LogicalPlan,
2037 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2038 let mut non_col_args = Vec::new();
2039 let aggr = match op.id() {
2040 token::T_SUM => sum_udaf(),
2041 token::T_QUANTILE => {
2042 let q =
2043 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2044 non_col_args.push(q);
2045 quantile_udaf()
2046 }
2047 token::T_AVG => avg_udaf(),
2048 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2049 token::T_MIN => min_udaf(),
2050 token::T_MAX => max_udaf(),
2051 token::T_GROUP => grouping_udaf(),
2052 token::T_STDDEV => stddev_pop_udaf(),
2053 token::T_STDVAR => var_pop_udaf(),
2054 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2055 name: format!("{op:?}"),
2056 }
2057 .fail()?,
2058 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2059 };
2060
2061 let exprs: Vec<DfExpr> = self
2063 .ctx
2064 .field_columns
2065 .iter()
2066 .map(|col| {
2067 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2068 let expr = aggr.call(non_col_args.clone());
2069 non_col_args.pop();
2070 expr
2071 })
2072 .collect::<Vec<_>>();
2073
2074 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2076 let prev_field_exprs: Vec<_> = self
2077 .ctx
2078 .field_columns
2079 .iter()
2080 .map(|col| DfExpr::Column(Column::from_name(col)))
2081 .collect();
2082
2083 ensure!(
2084 self.ctx.field_columns.len() == 1,
2085 UnsupportedExprSnafu {
2086 name: "count_values on multi-value input"
2087 }
2088 );
2089
2090 prev_field_exprs
2091 } else {
2092 vec![]
2093 };
2094
2095 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2097
2098 let normalized_exprs =
2099 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2100 for expr in normalized_exprs {
2101 new_field_columns.push(expr.schema_name().to_string());
2102 }
2103 self.ctx.field_columns = new_field_columns;
2104
2105 Ok((exprs, prev_field_exprs))
2106 }
2107
2108 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2109 let param = param
2110 .as_deref()
2111 .with_context(|| FunctionInvalidArgumentSnafu {
2112 fn_name: op.to_string(),
2113 })?;
2114 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2115 return FunctionInvalidArgumentSnafu {
2116 fn_name: op.to_string(),
2117 }
2118 .fail();
2119 };
2120
2121 Ok(val)
2122 }
2123
2124 fn get_param_as_literal_expr(
2125 param: &Option<Box<PromExpr>>,
2126 op: Option<TokenType>,
2127 expected_type: Option<ArrowDataType>,
2128 ) -> Result<DfExpr> {
2129 let prom_param = param.as_deref().with_context(|| {
2130 if let Some(op) = op {
2131 FunctionInvalidArgumentSnafu {
2132 fn_name: op.to_string(),
2133 }
2134 } else {
2135 FunctionInvalidArgumentSnafu {
2136 fn_name: "unknown".to_string(),
2137 }
2138 }
2139 })?;
2140
2141 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2142 if let Some(op) = op {
2143 FunctionInvalidArgumentSnafu {
2144 fn_name: op.to_string(),
2145 }
2146 } else {
2147 FunctionInvalidArgumentSnafu {
2148 fn_name: "unknown".to_string(),
2149 }
2150 }
2151 })?;
2152
2153 if let Some(expected_type) = expected_type {
2155 let expr_type = expr
2157 .get_type(&DFSchema::empty())
2158 .context(DataFusionPlanningSnafu)?;
2159 if expected_type != expr_type {
2160 return FunctionInvalidArgumentSnafu {
2161 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2162 }
2163 .fail();
2164 }
2165 }
2166
2167 Ok(expr)
2168 }
2169
2170 fn create_window_exprs(
2173 &mut self,
2174 op: TokenType,
2175 group_exprs: Vec<DfExpr>,
2176 input_plan: &LogicalPlan,
2177 ) -> Result<Vec<DfExpr>> {
2178 ensure!(
2179 self.ctx.field_columns.len() == 1,
2180 UnsupportedExprSnafu {
2181 name: "topk or bottomk on multi-value input"
2182 }
2183 );
2184
2185 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2186
2187 let asc = matches!(op.id(), token::T_BOTTOMK);
2188
2189 let tag_sort_exprs = self
2190 .create_tag_column_exprs()?
2191 .into_iter()
2192 .map(|expr| expr.sort(asc, true));
2193
2194 let exprs: Vec<DfExpr> = self
2196 .ctx
2197 .field_columns
2198 .iter()
2199 .map(|col| {
2200 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2201 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2203 sort_exprs.extend(tag_sort_exprs.clone());
2206
2207 DfExpr::WindowFunction(WindowFunction {
2208 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2209 args: vec![],
2210 partition_by: group_exprs.clone(),
2211 order_by: sort_exprs,
2212 window_frame: WindowFrame::new(Some(true)),
2213 null_treatment: None,
2214 })
2215 })
2216 .collect();
2217
2218 let normalized_exprs =
2219 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2220 Ok(normalized_exprs)
2221 }
2222
2223 #[deprecated(
2225 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2226 )]
2227 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2228 match expr {
2229 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2230 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2231 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2232 Self::try_build_float_literal(expr).map(|f| -f)
2233 }
2234 PromExpr::StringLiteral(_)
2235 | PromExpr::Binary(_)
2236 | PromExpr::VectorSelector(_)
2237 | PromExpr::MatrixSelector(_)
2238 | PromExpr::Call(_)
2239 | PromExpr::Extension(_)
2240 | PromExpr::Aggregate(_)
2241 | PromExpr::Subquery(_) => None,
2242 }
2243 }
2244
2245 async fn create_histogram_plan(
2247 &mut self,
2248 args: &PromFunctionArgs,
2249 session_state: &SessionState,
2250 ) -> Result<LogicalPlan> {
2251 if args.args.len() != 2 {
2252 return FunctionInvalidArgumentSnafu {
2253 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2254 }
2255 .fail();
2256 }
2257 #[allow(deprecated)]
2258 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2259 FunctionInvalidArgumentSnafu {
2260 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2261 }
2262 })?;
2263
2264 let input = args.args[1].as_ref().clone();
2265 let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2266
2267 if !self.ctx.has_le_tag() {
2268 return ColumnNotFoundSnafu {
2269 col: LE_COLUMN_NAME.to_string(),
2270 }
2271 .fail();
2272 }
2273 let time_index_column =
2274 self.ctx
2275 .time_index_column
2276 .clone()
2277 .with_context(|| TimeIndexNotFoundSnafu {
2278 table: self.ctx.table_name.clone().unwrap_or_default(),
2279 })?;
2280 let field_column = self
2282 .ctx
2283 .field_columns
2284 .first()
2285 .with_context(|| FunctionInvalidArgumentSnafu {
2286 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2287 })?
2288 .clone();
2289 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2291
2292 Ok(LogicalPlan::Extension(Extension {
2293 node: Arc::new(
2294 HistogramFold::new(
2295 LE_COLUMN_NAME.to_string(),
2296 field_column,
2297 time_index_column,
2298 phi,
2299 input_plan,
2300 )
2301 .context(DataFusionPlanningSnafu)?,
2302 ),
2303 }))
2304 }
2305
2306 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2308 if args.args.len() != 1 {
2309 return FunctionInvalidArgumentSnafu {
2310 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2311 }
2312 .fail();
2313 }
2314 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2315
2316 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2318 self.ctx.reset_table_name_and_schema();
2319 self.ctx.tag_columns = vec![];
2320 self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2321 Ok(LogicalPlan::Extension(Extension {
2322 node: Arc::new(
2323 EmptyMetric::new(
2324 self.ctx.start,
2325 self.ctx.end,
2326 self.ctx.interval,
2327 SPECIAL_TIME_FUNCTION.to_string(),
2328 GREPTIME_VALUE.to_string(),
2329 Some(lit),
2330 )
2331 .context(DataFusionPlanningSnafu)?,
2332 ),
2333 }))
2334 }
2335
2336 async fn create_scalar_plan(
2338 &mut self,
2339 args: &PromFunctionArgs,
2340 session_state: &SessionState,
2341 ) -> Result<LogicalPlan> {
2342 ensure!(
2343 args.len() == 1,
2344 FunctionInvalidArgumentSnafu {
2345 fn_name: SCALAR_FUNCTION
2346 }
2347 );
2348 let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2349 ensure!(
2350 self.ctx.field_columns.len() == 1,
2351 MultiFieldsNotSupportedSnafu {
2352 operator: SCALAR_FUNCTION
2353 },
2354 );
2355 let scalar_plan = LogicalPlan::Extension(Extension {
2356 node: Arc::new(
2357 ScalarCalculate::new(
2358 self.ctx.start,
2359 self.ctx.end,
2360 self.ctx.interval,
2361 input,
2362 self.ctx.time_index_column.as_ref().unwrap(),
2363 &self.ctx.tag_columns,
2364 &self.ctx.field_columns[0],
2365 self.ctx.table_name.as_deref(),
2366 )
2367 .context(PromqlPlanNodeSnafu)?,
2368 ),
2369 });
2370 self.ctx.tag_columns.clear();
2372 self.ctx.field_columns.clear();
2373 self.ctx
2374 .field_columns
2375 .push(scalar_plan.schema().field(1).name().clone());
2376 Ok(scalar_plan)
2377 }
2378
2379 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2382 match expr {
2383 PromExpr::NumberLiteral(NumberLiteral { val }) => {
2384 let scalar_value = ScalarValue::Float64(Some(*val));
2385 Some(DfExpr::Literal(scalar_value))
2386 }
2387 PromExpr::StringLiteral(StringLiteral { val }) => {
2388 let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2389 Some(DfExpr::Literal(scalar_value))
2390 }
2391 PromExpr::VectorSelector(_)
2392 | PromExpr::MatrixSelector(_)
2393 | PromExpr::Extension(_)
2394 | PromExpr::Aggregate(_)
2395 | PromExpr::Subquery(_) => None,
2396 PromExpr::Call(Call { func, .. }) => {
2397 if func.name == SPECIAL_TIME_FUNCTION {
2398 Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2399 } else {
2400 None
2401 }
2402 }
2403 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2404 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2406 PromExpr::Binary(PromBinaryExpr {
2407 lhs,
2408 rhs,
2409 op,
2410 modifier,
2411 }) => {
2412 let lhs = Self::try_build_literal_expr(lhs)?;
2413 let rhs = Self::try_build_literal_expr(rhs)?;
2414 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2415 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2416 let expr = expr_builder(lhs, rhs).ok()?;
2417
2418 let should_return_bool = if let Some(m) = modifier {
2419 m.return_bool
2420 } else {
2421 false
2422 };
2423 if is_comparison_op && should_return_bool {
2424 Some(DfExpr::Cast(Cast {
2425 expr: Box::new(expr),
2426 data_type: ArrowDataType::Float64,
2427 }))
2428 } else {
2429 Some(expr)
2430 }
2431 }
2432 }
2433 }
2434
2435 fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2436 match expr {
2437 PromExpr::Call(Call { func, .. }) => {
2438 if func.name == SPECIAL_TIME_FUNCTION {
2439 Some(build_special_time_expr(time_index_col))
2440 } else {
2441 None
2442 }
2443 }
2444 _ => None,
2445 }
2446 }
2447
2448 #[allow(clippy::type_complexity)]
2451 fn prom_token_to_binary_expr_builder(
2452 token: TokenType,
2453 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2454 match token.id() {
2455 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2456 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2457 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2458 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2459 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2460 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2461 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2462 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2463 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2464 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2465 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2466 token::T_POW => Ok(Box::new(|lhs, rhs| {
2467 Ok(DfExpr::ScalarFunction(ScalarFunction {
2468 func: datafusion_functions::math::power(),
2469 args: vec![lhs, rhs],
2470 }))
2471 })),
2472 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2473 Ok(DfExpr::ScalarFunction(ScalarFunction {
2474 func: datafusion_functions::math::atan2(),
2475 args: vec![lhs, rhs],
2476 }))
2477 })),
2478 _ => UnexpectedTokenSnafu { token }.fail(),
2479 }
2480 }
2481
2482 fn is_token_a_comparison_op(token: TokenType) -> bool {
2484 matches!(
2485 token.id(),
2486 token::T_EQLC
2487 | token::T_NEQ
2488 | token::T_GTR
2489 | token::T_LSS
2490 | token::T_GTE
2491 | token::T_LTE
2492 )
2493 }
2494
2495 fn is_token_a_set_op(token: TokenType) -> bool {
2497 matches!(
2498 token.id(),
2499 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2503 }
2504
2505 #[allow(clippy::too_many_arguments)]
2508 fn join_on_non_field_columns(
2509 &self,
2510 left: LogicalPlan,
2511 right: LogicalPlan,
2512 left_table_ref: TableReference,
2513 right_table_ref: TableReference,
2514 left_time_index_column: Option<String>,
2515 right_time_index_column: Option<String>,
2516 only_join_time_index: bool,
2517 modifier: &Option<BinModifier>,
2518 ) -> Result<LogicalPlan> {
2519 let mut left_tag_columns = if only_join_time_index {
2520 BTreeSet::new()
2521 } else {
2522 self.ctx
2523 .tag_columns
2524 .iter()
2525 .cloned()
2526 .collect::<BTreeSet<_>>()
2527 };
2528 let mut right_tag_columns = left_tag_columns.clone();
2529
2530 if let Some(modifier) = modifier {
2532 if let Some(matching) = &modifier.matching {
2534 match matching {
2535 LabelModifier::Include(on) => {
2537 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2538 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2539 right_tag_columns =
2540 right_tag_columns.intersection(&mask).cloned().collect();
2541 }
2542 LabelModifier::Exclude(ignoring) => {
2544 for label in &ignoring.labels {
2546 let _ = left_tag_columns.remove(label);
2547 let _ = right_tag_columns.remove(label);
2548 }
2549 }
2550 }
2551 }
2552 }
2553
2554 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2556 (left_time_index_column, right_time_index_column)
2557 {
2558 left_tag_columns.insert(left_time_index_column);
2559 right_tag_columns.insert(right_time_index_column);
2560 }
2561
2562 let right = LogicalPlanBuilder::from(right)
2563 .alias(right_table_ref)
2564 .context(DataFusionPlanningSnafu)?
2565 .build()
2566 .context(DataFusionPlanningSnafu)?;
2567
2568 LogicalPlanBuilder::from(left)
2570 .alias(left_table_ref)
2571 .context(DataFusionPlanningSnafu)?
2572 .join_detailed(
2573 right,
2574 JoinType::Inner,
2575 (
2576 left_tag_columns
2577 .into_iter()
2578 .map(Column::from_name)
2579 .collect::<Vec<_>>(),
2580 right_tag_columns
2581 .into_iter()
2582 .map(Column::from_name)
2583 .collect::<Vec<_>>(),
2584 ),
2585 None,
2586 true,
2587 )
2588 .context(DataFusionPlanningSnafu)?
2589 .build()
2590 .context(DataFusionPlanningSnafu)
2591 }
2592
2593 fn set_op_on_non_field_columns(
2595 &mut self,
2596 left: LogicalPlan,
2597 mut right: LogicalPlan,
2598 left_context: PromPlannerContext,
2599 right_context: PromPlannerContext,
2600 op: TokenType,
2601 modifier: &Option<BinModifier>,
2602 ) -> Result<LogicalPlan> {
2603 let mut left_tag_col_set = left_context
2604 .tag_columns
2605 .iter()
2606 .cloned()
2607 .collect::<HashSet<_>>();
2608 let mut right_tag_col_set = right_context
2609 .tag_columns
2610 .iter()
2611 .cloned()
2612 .collect::<HashSet<_>>();
2613
2614 if matches!(op.id(), token::T_LOR) {
2615 return self.or_operator(
2616 left,
2617 right,
2618 left_tag_col_set,
2619 right_tag_col_set,
2620 left_context,
2621 right_context,
2622 modifier,
2623 );
2624 }
2625
2626 if let Some(modifier) = modifier {
2628 ensure!(
2630 matches!(
2631 modifier.card,
2632 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2633 ),
2634 UnsupportedVectorMatchSnafu {
2635 name: modifier.card.clone(),
2636 },
2637 );
2638 if let Some(matching) = &modifier.matching {
2640 match matching {
2641 LabelModifier::Include(on) => {
2643 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2644 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2645 right_tag_col_set =
2646 right_tag_col_set.intersection(&mask).cloned().collect();
2647 }
2648 LabelModifier::Exclude(ignoring) => {
2650 for label in &ignoring.labels {
2652 let _ = left_tag_col_set.remove(label);
2653 let _ = right_tag_col_set.remove(label);
2654 }
2655 }
2656 }
2657 }
2658 }
2659 if !matches!(op.id(), token::T_LOR) {
2661 ensure!(
2662 left_tag_col_set == right_tag_col_set,
2663 CombineTableColumnMismatchSnafu {
2664 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2665 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2666 }
2667 )
2668 };
2669 let left_time_index = left_context.time_index_column.clone().unwrap();
2670 let right_time_index = right_context.time_index_column.clone().unwrap();
2671 let join_keys = left_tag_col_set
2672 .iter()
2673 .cloned()
2674 .chain([left_time_index.clone()])
2675 .collect::<Vec<_>>();
2676 self.ctx.time_index_column = Some(left_time_index.clone());
2677
2678 if left_context.time_index_column != right_context.time_index_column {
2680 let right_project_exprs = right
2681 .schema()
2682 .fields()
2683 .iter()
2684 .map(|field| {
2685 if field.name() == &right_time_index {
2686 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2687 } else {
2688 DfExpr::Column(Column::from_name(field.name()))
2689 }
2690 })
2691 .collect::<Vec<_>>();
2692
2693 right = LogicalPlanBuilder::from(right)
2694 .project(right_project_exprs)
2695 .context(DataFusionPlanningSnafu)?
2696 .build()
2697 .context(DataFusionPlanningSnafu)?;
2698 }
2699
2700 ensure!(
2701 left_context.field_columns.len() == 1,
2702 MultiFieldsNotSupportedSnafu {
2703 operator: "AND operator"
2704 }
2705 );
2706 let left_field_col = left_context.field_columns.first().unwrap();
2709 self.ctx.field_columns = vec![left_field_col.clone()];
2710
2711 match op.id() {
2714 token::T_LAND => LogicalPlanBuilder::from(left)
2715 .distinct()
2716 .context(DataFusionPlanningSnafu)?
2717 .join_detailed(
2718 right,
2719 JoinType::LeftSemi,
2720 (join_keys.clone(), join_keys),
2721 None,
2722 true,
2723 )
2724 .context(DataFusionPlanningSnafu)?
2725 .build()
2726 .context(DataFusionPlanningSnafu),
2727 token::T_LUNLESS => LogicalPlanBuilder::from(left)
2728 .distinct()
2729 .context(DataFusionPlanningSnafu)?
2730 .join_detailed(
2731 right,
2732 JoinType::LeftAnti,
2733 (join_keys.clone(), join_keys),
2734 None,
2735 true,
2736 )
2737 .context(DataFusionPlanningSnafu)?
2738 .build()
2739 .context(DataFusionPlanningSnafu),
2740 token::T_LOR => {
2741 unreachable!()
2744 }
2745 _ => UnexpectedTokenSnafu { token: op }.fail(),
2746 }
2747 }
2748
2749 #[allow(clippy::too_many_arguments)]
2751 fn or_operator(
2752 &mut self,
2753 left: LogicalPlan,
2754 right: LogicalPlan,
2755 left_tag_cols_set: HashSet<String>,
2756 right_tag_cols_set: HashSet<String>,
2757 left_context: PromPlannerContext,
2758 right_context: PromPlannerContext,
2759 modifier: &Option<BinModifier>,
2760 ) -> Result<LogicalPlan> {
2761 ensure!(
2763 left_context.field_columns.len() == right_context.field_columns.len(),
2764 CombineTableColumnMismatchSnafu {
2765 left: left_context.field_columns.clone(),
2766 right: right_context.field_columns.clone()
2767 }
2768 );
2769 ensure!(
2770 left_context.field_columns.len() == 1,
2771 MultiFieldsNotSupportedSnafu {
2772 operator: "OR operator"
2773 }
2774 );
2775
2776 let all_tags = left_tag_cols_set
2778 .union(&right_tag_cols_set)
2779 .cloned()
2780 .collect::<HashSet<_>>();
2781 let tags_not_in_left = all_tags
2782 .difference(&left_tag_cols_set)
2783 .cloned()
2784 .collect::<Vec<_>>();
2785 let tags_not_in_right = all_tags
2786 .difference(&right_tag_cols_set)
2787 .cloned()
2788 .collect::<Vec<_>>();
2789 let left_qualifier = left.schema().qualified_field(0).0.cloned();
2790 let right_qualifier = right.schema().qualified_field(0).0.cloned();
2791 let left_qualifier_string = left_qualifier
2792 .as_ref()
2793 .map(|l| l.to_string())
2794 .unwrap_or_default();
2795 let right_qualifier_string = right_qualifier
2796 .as_ref()
2797 .map(|r| r.to_string())
2798 .unwrap_or_default();
2799 let left_time_index_column =
2800 left_context
2801 .time_index_column
2802 .clone()
2803 .with_context(|| TimeIndexNotFoundSnafu {
2804 table: left_qualifier_string.clone(),
2805 })?;
2806 let right_time_index_column =
2807 right_context
2808 .time_index_column
2809 .clone()
2810 .with_context(|| TimeIndexNotFoundSnafu {
2811 table: right_qualifier_string.clone(),
2812 })?;
2813 let left_field_col = left_context.field_columns.first().unwrap();
2815 let right_field_col = right_context.field_columns.first().unwrap();
2816
2817 let mut all_columns_set = left
2819 .schema()
2820 .fields()
2821 .iter()
2822 .chain(right.schema().fields().iter())
2823 .map(|field| field.name().clone())
2824 .collect::<HashSet<_>>();
2825 all_columns_set.remove(&left_time_index_column);
2827 all_columns_set.remove(&right_time_index_column);
2828 if left_field_col != right_field_col {
2830 all_columns_set.remove(right_field_col);
2831 }
2832 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2833 all_columns.sort_unstable();
2835 all_columns.insert(0, left_time_index_column.clone());
2837
2838 let left_proj_exprs = all_columns.iter().map(|col| {
2840 if tags_not_in_left.contains(col) {
2841 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2842 } else {
2843 DfExpr::Column(Column::new(None::<String>, col))
2844 }
2845 });
2846 let right_time_index_expr = DfExpr::Column(Column::new(
2847 right_qualifier.clone(),
2848 right_time_index_column,
2849 ))
2850 .alias(left_time_index_column.clone());
2851 let right_qualifier_for_field = right
2854 .schema()
2855 .iter()
2856 .find(|(_, f)| f.name() == right_field_col)
2857 .map(|(q, _)| q)
2858 .context(ColumnNotFoundSnafu {
2859 col: right_field_col.to_string(),
2860 })?
2861 .cloned();
2862
2863 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2865 if col == left_field_col && left_field_col != right_field_col {
2867 DfExpr::Column(Column::new(
2869 right_qualifier_for_field.clone(),
2870 right_field_col,
2871 ))
2872 } else if tags_not_in_right.contains(col) {
2873 DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2874 } else {
2875 DfExpr::Column(Column::new(None::<String>, col))
2876 }
2877 });
2878 let right_proj_exprs = [right_time_index_expr]
2879 .into_iter()
2880 .chain(right_proj_exprs_without_time_index);
2881
2882 let left_projected = LogicalPlanBuilder::from(left)
2883 .project(left_proj_exprs)
2884 .context(DataFusionPlanningSnafu)?
2885 .alias(left_qualifier_string.clone())
2886 .context(DataFusionPlanningSnafu)?
2887 .build()
2888 .context(DataFusionPlanningSnafu)?;
2889 let right_projected = LogicalPlanBuilder::from(right)
2890 .project(right_proj_exprs)
2891 .context(DataFusionPlanningSnafu)?
2892 .alias(right_qualifier_string.clone())
2893 .context(DataFusionPlanningSnafu)?
2894 .build()
2895 .context(DataFusionPlanningSnafu)?;
2896
2897 let mut match_columns = if let Some(modifier) = modifier
2899 && let Some(matching) = &modifier.matching
2900 {
2901 match matching {
2902 LabelModifier::Include(on) => on.labels.clone(),
2904 LabelModifier::Exclude(ignoring) => {
2906 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2907 all_tags.difference(&ignoring).cloned().collect()
2908 }
2909 }
2910 } else {
2911 all_tags.iter().cloned().collect()
2912 };
2913 match_columns.sort_unstable();
2915 let schema = left_projected.schema().clone();
2917 let union_distinct_on = UnionDistinctOn::new(
2918 left_projected,
2919 right_projected,
2920 match_columns,
2921 left_time_index_column.clone(),
2922 schema,
2923 );
2924 let result = LogicalPlan::Extension(Extension {
2925 node: Arc::new(union_distinct_on),
2926 });
2927
2928 self.ctx.time_index_column = Some(left_time_index_column);
2930 self.ctx.tag_columns = all_tags.into_iter().collect();
2931 self.ctx.field_columns = vec![left_field_col.to_string()];
2932
2933 Ok(result)
2934 }
2935
2936 fn projection_for_each_field_column<F>(
2944 &mut self,
2945 input: LogicalPlan,
2946 name_to_expr: F,
2947 ) -> Result<LogicalPlan>
2948 where
2949 F: FnMut(&String) -> Result<DfExpr>,
2950 {
2951 let non_field_columns_iter = self
2952 .ctx
2953 .tag_columns
2954 .iter()
2955 .chain(self.ctx.time_index_column.iter())
2956 .map(|col| {
2957 Ok(DfExpr::Column(Column::new(
2958 self.ctx.table_name.clone().map(TableReference::bare),
2959 col,
2960 )))
2961 });
2962
2963 let result_field_columns = self
2965 .ctx
2966 .field_columns
2967 .iter()
2968 .map(name_to_expr)
2969 .collect::<Result<Vec<_>>>()?;
2970
2971 self.ctx.field_columns = result_field_columns
2973 .iter()
2974 .map(|expr| expr.schema_name().to_string())
2975 .collect();
2976 let field_columns_iter = result_field_columns
2977 .into_iter()
2978 .zip(self.ctx.field_columns.iter())
2979 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2980
2981 let project_fields = non_field_columns_iter
2983 .chain(field_columns_iter)
2984 .collect::<Result<Vec<_>>>()?;
2985
2986 LogicalPlanBuilder::from(input)
2987 .project(project_fields)
2988 .context(DataFusionPlanningSnafu)?
2989 .build()
2990 .context(DataFusionPlanningSnafu)
2991 }
2992
2993 fn filter_on_field_column<F>(
2996 &self,
2997 input: LogicalPlan,
2998 mut name_to_expr: F,
2999 ) -> Result<LogicalPlan>
3000 where
3001 F: FnMut(&String) -> Result<DfExpr>,
3002 {
3003 ensure!(
3004 self.ctx.field_columns.len() == 1,
3005 UnsupportedExprSnafu {
3006 name: "filter on multi-value input"
3007 }
3008 );
3009
3010 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3011
3012 LogicalPlanBuilder::from(input)
3013 .filter(field_column_filter)
3014 .context(DataFusionPlanningSnafu)?
3015 .build()
3016 .context(DataFusionPlanningSnafu)
3017 }
3018
3019 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3022 let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
3023 let input_expr = datafusion::logical_expr::col(
3024 self.ctx
3025 .time_index_column
3026 .as_ref()
3027 .with_context(|| TimeIndexNotFoundSnafu {
3029 table: "<doesn't matter>",
3030 })?,
3031 );
3032 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3033 func: datafusion_functions::datetime::date_part(),
3034 args: vec![lit_expr, input_expr],
3035 });
3036 Ok(fn_expr)
3037 }
3038}
3039
3040#[derive(Default, Debug)]
3041struct FunctionArgs {
3042 input: Option<PromExpr>,
3043 literals: Vec<DfExpr>,
3044}
3045
3046#[derive(Debug, Clone)]
3047enum ScalarFunc {
3048 DataFusionBuiltin(Arc<ScalarUdfDef>),
3049 DataFusionUdf(Arc<ScalarUdfDef>),
3051 Udf(Arc<ScalarUdfDef>),
3052 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3056 GeneratedExpr,
3058}
3059
3060#[cfg(test)]
3061mod test {
3062 use std::time::{Duration, UNIX_EPOCH};
3063
3064 use catalog::memory::MemoryCatalogManager;
3065 use catalog::RegisterTableRequest;
3066 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3067 use common_query::test_util::DummyDecoder;
3068 use datafusion::execution::SessionStateBuilder;
3069 use datatypes::prelude::ConcreteDataType;
3070 use datatypes::schema::{ColumnSchema, Schema};
3071 use promql_parser::label::Labels;
3072 use promql_parser::parser;
3073 use session::context::QueryContext;
3074 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3075 use table::test_util::EmptyTable;
3076
3077 use super::*;
3078
3079 fn build_session_state() -> SessionState {
3080 SessionStateBuilder::new().with_default_features().build()
3081 }
3082
3083 async fn build_test_table_provider(
3084 table_name_tuples: &[(String, String)],
3085 num_tag: usize,
3086 num_field: usize,
3087 ) -> DfTableSourceProvider {
3088 let catalog_list = MemoryCatalogManager::with_default_setup();
3089 for (schema_name, table_name) in table_name_tuples {
3090 let mut columns = vec![];
3091 for i in 0..num_tag {
3092 columns.push(ColumnSchema::new(
3093 format!("tag_{i}"),
3094 ConcreteDataType::string_datatype(),
3095 false,
3096 ));
3097 }
3098 columns.push(
3099 ColumnSchema::new(
3100 "timestamp".to_string(),
3101 ConcreteDataType::timestamp_millisecond_datatype(),
3102 false,
3103 )
3104 .with_time_index(true),
3105 );
3106 for i in 0..num_field {
3107 columns.push(ColumnSchema::new(
3108 format!("field_{i}"),
3109 ConcreteDataType::float64_datatype(),
3110 true,
3111 ));
3112 }
3113 let schema = Arc::new(Schema::new(columns));
3114 let table_meta = TableMetaBuilder::empty()
3115 .schema(schema)
3116 .primary_key_indices((0..num_tag).collect())
3117 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3118 .next_column_id(1024)
3119 .build()
3120 .unwrap();
3121 let table_info = TableInfoBuilder::default()
3122 .name(table_name.to_string())
3123 .meta(table_meta)
3124 .build()
3125 .unwrap();
3126 let table = EmptyTable::from_table_info(&table_info);
3127
3128 assert!(catalog_list
3129 .register_table_sync(RegisterTableRequest {
3130 catalog: DEFAULT_CATALOG_NAME.to_string(),
3131 schema: schema_name.to_string(),
3132 table_name: table_name.to_string(),
3133 table_id: 1024,
3134 table,
3135 })
3136 .is_ok());
3137 }
3138
3139 DfTableSourceProvider::new(
3140 catalog_list,
3141 false,
3142 QueryContext::arc(),
3143 DummyDecoder::arc(),
3144 false,
3145 )
3146 }
3147
3148 async fn build_test_table_provider_with_fields(
3149 table_name_tuples: &[(String, String)],
3150 tags: &[&str],
3151 ) -> DfTableSourceProvider {
3152 let catalog_list = MemoryCatalogManager::with_default_setup();
3153 for (schema_name, table_name) in table_name_tuples {
3154 let mut columns = vec![];
3155 let num_tag = tags.len();
3156 for tag in tags {
3157 columns.push(ColumnSchema::new(
3158 tag.to_string(),
3159 ConcreteDataType::string_datatype(),
3160 false,
3161 ));
3162 }
3163 columns.push(
3164 ColumnSchema::new(
3165 "greptime_timestamp".to_string(),
3166 ConcreteDataType::timestamp_millisecond_datatype(),
3167 false,
3168 )
3169 .with_time_index(true),
3170 );
3171 columns.push(ColumnSchema::new(
3172 "greptime_value".to_string(),
3173 ConcreteDataType::float64_datatype(),
3174 true,
3175 ));
3176 let schema = Arc::new(Schema::new(columns));
3177 let table_meta = TableMetaBuilder::empty()
3178 .schema(schema)
3179 .primary_key_indices((0..num_tag).collect())
3180 .next_column_id(1024)
3181 .build()
3182 .unwrap();
3183 let table_info = TableInfoBuilder::default()
3184 .name(table_name.to_string())
3185 .meta(table_meta)
3186 .build()
3187 .unwrap();
3188 let table = EmptyTable::from_table_info(&table_info);
3189
3190 assert!(catalog_list
3191 .register_table_sync(RegisterTableRequest {
3192 catalog: DEFAULT_CATALOG_NAME.to_string(),
3193 schema: schema_name.to_string(),
3194 table_name: table_name.to_string(),
3195 table_id: 1024,
3196 table,
3197 })
3198 .is_ok());
3199 }
3200
3201 DfTableSourceProvider::new(
3202 catalog_list,
3203 false,
3204 QueryContext::arc(),
3205 DummyDecoder::arc(),
3206 false,
3207 )
3208 }
3209
3210 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3226 let prom_expr =
3227 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3228 let eval_stmt = EvalStmt {
3229 expr: prom_expr,
3230 start: UNIX_EPOCH,
3231 end: UNIX_EPOCH
3232 .checked_add(Duration::from_secs(100_000))
3233 .unwrap(),
3234 interval: Duration::from_secs(5),
3235 lookback_delta: Duration::from_secs(1),
3236 };
3237
3238 let table_provider = build_test_table_provider(
3239 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3240 1,
3241 1,
3242 )
3243 .await;
3244 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3245 .await
3246 .unwrap();
3247
3248 let expected = String::from(
3249 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3250 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3251 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3252 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3253 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3254 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3255 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3256 ).replace("TEMPLATE", plan_name);
3257
3258 assert_eq!(plan.display_indent_schema().to_string(), expected);
3259 }
3260
3261 #[tokio::test]
3262 async fn single_abs() {
3263 do_single_instant_function_call("abs", "abs").await;
3264 }
3265
3266 #[tokio::test]
3267 #[should_panic]
3268 async fn single_absent() {
3269 do_single_instant_function_call("absent", "").await;
3270 }
3271
3272 #[tokio::test]
3273 async fn single_ceil() {
3274 do_single_instant_function_call("ceil", "ceil").await;
3275 }
3276
3277 #[tokio::test]
3278 async fn single_exp() {
3279 do_single_instant_function_call("exp", "exp").await;
3280 }
3281
3282 #[tokio::test]
3283 async fn single_ln() {
3284 do_single_instant_function_call("ln", "ln").await;
3285 }
3286
3287 #[tokio::test]
3288 async fn single_log2() {
3289 do_single_instant_function_call("log2", "log2").await;
3290 }
3291
3292 #[tokio::test]
3293 async fn single_log10() {
3294 do_single_instant_function_call("log10", "log10").await;
3295 }
3296
3297 #[tokio::test]
3298 #[should_panic]
3299 async fn single_scalar() {
3300 do_single_instant_function_call("scalar", "").await;
3301 }
3302
3303 #[tokio::test]
3304 #[should_panic]
3305 async fn single_sgn() {
3306 do_single_instant_function_call("sgn", "").await;
3307 }
3308
3309 #[tokio::test]
3310 #[should_panic]
3311 async fn single_sort() {
3312 do_single_instant_function_call("sort", "").await;
3313 }
3314
3315 #[tokio::test]
3316 #[should_panic]
3317 async fn single_sort_desc() {
3318 do_single_instant_function_call("sort_desc", "").await;
3319 }
3320
3321 #[tokio::test]
3322 async fn single_sqrt() {
3323 do_single_instant_function_call("sqrt", "sqrt").await;
3324 }
3325
3326 #[tokio::test]
3327 #[should_panic]
3328 async fn single_timestamp() {
3329 do_single_instant_function_call("timestamp", "").await;
3330 }
3331
3332 #[tokio::test]
3333 async fn single_acos() {
3334 do_single_instant_function_call("acos", "acos").await;
3335 }
3336
3337 #[tokio::test]
3338 #[should_panic]
3339 async fn single_acosh() {
3340 do_single_instant_function_call("acosh", "").await;
3341 }
3342
3343 #[tokio::test]
3344 async fn single_asin() {
3345 do_single_instant_function_call("asin", "asin").await;
3346 }
3347
3348 #[tokio::test]
3349 #[should_panic]
3350 async fn single_asinh() {
3351 do_single_instant_function_call("asinh", "").await;
3352 }
3353
3354 #[tokio::test]
3355 async fn single_atan() {
3356 do_single_instant_function_call("atan", "atan").await;
3357 }
3358
3359 #[tokio::test]
3360 #[should_panic]
3361 async fn single_atanh() {
3362 do_single_instant_function_call("atanh", "").await;
3363 }
3364
3365 #[tokio::test]
3366 async fn single_cos() {
3367 do_single_instant_function_call("cos", "cos").await;
3368 }
3369
3370 #[tokio::test]
3371 #[should_panic]
3372 async fn single_cosh() {
3373 do_single_instant_function_call("cosh", "").await;
3374 }
3375
3376 #[tokio::test]
3377 async fn single_sin() {
3378 do_single_instant_function_call("sin", "sin").await;
3379 }
3380
3381 #[tokio::test]
3382 #[should_panic]
3383 async fn single_sinh() {
3384 do_single_instant_function_call("sinh", "").await;
3385 }
3386
3387 #[tokio::test]
3388 async fn single_tan() {
3389 do_single_instant_function_call("tan", "tan").await;
3390 }
3391
3392 #[tokio::test]
3393 #[should_panic]
3394 async fn single_tanh() {
3395 do_single_instant_function_call("tanh", "").await;
3396 }
3397
3398 #[tokio::test]
3399 #[should_panic]
3400 async fn single_deg() {
3401 do_single_instant_function_call("deg", "").await;
3402 }
3403
3404 #[tokio::test]
3405 #[should_panic]
3406 async fn single_rad() {
3407 do_single_instant_function_call("rad", "").await;
3408 }
3409
3410 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3432 let prom_expr = parser::parse(&format!(
3433 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3434 ))
3435 .unwrap();
3436 let mut eval_stmt = EvalStmt {
3437 expr: prom_expr,
3438 start: UNIX_EPOCH,
3439 end: UNIX_EPOCH
3440 .checked_add(Duration::from_secs(100_000))
3441 .unwrap(),
3442 interval: Duration::from_secs(5),
3443 lookback_delta: Duration::from_secs(1),
3444 };
3445
3446 let table_provider = build_test_table_provider(
3448 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3449 2,
3450 2,
3451 )
3452 .await;
3453 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3454 .await
3455 .unwrap();
3456 let expected_no_without = String::from(
3457 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3458 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3459 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3460 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3461 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3462 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3463 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3464 ).replace("TEMPLATE", plan_name);
3465 assert_eq!(
3466 plan.display_indent_schema().to_string(),
3467 expected_no_without
3468 );
3469
3470 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3472 *modifier = Some(LabelModifier::Exclude(Labels {
3473 labels: vec![String::from("tag_1")].into_iter().collect(),
3474 }));
3475 }
3476 let table_provider = build_test_table_provider(
3477 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3478 2,
3479 2,
3480 )
3481 .await;
3482 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3483 .await
3484 .unwrap();
3485 let expected_without = String::from(
3486 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3487 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3488 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3489 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3490 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3491 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3492 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3493 ).replace("TEMPLATE", plan_name);
3494 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3495 }
3496
3497 #[tokio::test]
3498 async fn aggregate_sum() {
3499 do_aggregate_expr_plan("sum", "sum").await;
3500 }
3501
3502 #[tokio::test]
3503 async fn aggregate_avg() {
3504 do_aggregate_expr_plan("avg", "avg").await;
3505 }
3506
3507 #[tokio::test]
3508 #[should_panic] async fn aggregate_count() {
3510 do_aggregate_expr_plan("count", "count").await;
3511 }
3512
3513 #[tokio::test]
3514 async fn aggregate_min() {
3515 do_aggregate_expr_plan("min", "min").await;
3516 }
3517
3518 #[tokio::test]
3519 async fn aggregate_max() {
3520 do_aggregate_expr_plan("max", "max").await;
3521 }
3522
3523 #[tokio::test]
3524 #[should_panic] async fn aggregate_group() {
3526 do_aggregate_expr_plan("grouping", "GROUPING").await;
3527 }
3528
3529 #[tokio::test]
3530 async fn aggregate_stddev() {
3531 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3532 }
3533
3534 #[tokio::test]
3535 async fn aggregate_stdvar() {
3536 do_aggregate_expr_plan("stdvar", "var_pop").await;
3537 }
3538
3539 #[tokio::test]
3563 async fn binary_op_column_column() {
3564 let prom_expr =
3565 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3566 let eval_stmt = EvalStmt {
3567 expr: prom_expr,
3568 start: UNIX_EPOCH,
3569 end: UNIX_EPOCH
3570 .checked_add(Duration::from_secs(100_000))
3571 .unwrap(),
3572 interval: Duration::from_secs(5),
3573 lookback_delta: Duration::from_secs(1),
3574 };
3575
3576 let table_provider = build_test_table_provider(
3577 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3578 1,
3579 1,
3580 )
3581 .await;
3582 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3583 .await
3584 .unwrap();
3585
3586 let expected = String::from(
3587 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3588 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3589 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3590 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3591 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3592 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3593 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3594 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3595 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3596 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3597 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3598 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3599 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3600 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3601 );
3602
3603 assert_eq!(plan.display_indent_schema().to_string(), expected);
3604 }
3605
3606 async fn indie_query_plan_compare(query: &str, expected: String) {
3607 let prom_expr = parser::parse(query).unwrap();
3608 let eval_stmt = EvalStmt {
3609 expr: prom_expr,
3610 start: UNIX_EPOCH,
3611 end: UNIX_EPOCH
3612 .checked_add(Duration::from_secs(100_000))
3613 .unwrap(),
3614 interval: Duration::from_secs(5),
3615 lookback_delta: Duration::from_secs(1),
3616 };
3617
3618 let table_provider = build_test_table_provider(
3619 &[
3620 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3621 (
3622 "greptime_private".to_string(),
3623 "some_alt_metric".to_string(),
3624 ),
3625 ],
3626 1,
3627 1,
3628 )
3629 .await;
3630 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3631 .await
3632 .unwrap();
3633
3634 assert_eq!(plan.display_indent_schema().to_string(), expected);
3635 }
3636
3637 #[tokio::test]
3638 async fn binary_op_literal_column() {
3639 let query = r#"1 + some_metric{tag_0="bar"}"#;
3640 let expected = String::from(
3641 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3642 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3643 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3644 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3645 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3646 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3647 );
3648
3649 indie_query_plan_compare(query, expected).await;
3650 }
3651
3652 #[tokio::test]
3653 async fn binary_op_literal_literal() {
3654 let query = r#"1 + 1"#;
3655 let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3656
3657 indie_query_plan_compare(query, expected).await;
3658 }
3659
3660 #[tokio::test]
3661 async fn simple_bool_grammar() {
3662 let query = "some_metric != bool 1.2345";
3663 let expected = String::from(
3664 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3665 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3666 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3667 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3668 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3669 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3670 );
3671
3672 indie_query_plan_compare(query, expected).await;
3673 }
3674
3675 #[tokio::test]
3676 async fn bool_with_additional_arithmetic() {
3677 let query = "some_metric + (1 == bool 2)";
3678 let expected = String::from(
3679 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3680 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3681 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3682 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3683 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3684 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3685 );
3686
3687 indie_query_plan_compare(query, expected).await;
3688 }
3689
3690 #[tokio::test]
3691 async fn simple_unary() {
3692 let query = "-some_metric";
3693 let expected = String::from(
3694 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3695 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3696 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3697 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3698 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3699 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3700 );
3701
3702 indie_query_plan_compare(query, expected).await;
3703 }
3704
3705 #[tokio::test]
3706 async fn increase_aggr() {
3707 let query = "increase(some_metric[5m])";
3708 let expected = String::from(
3709 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3710 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3711 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3712 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3713 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3714 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3715 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3716 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3717 );
3718
3719 indie_query_plan_compare(query, expected).await;
3720 }
3721
3722 #[tokio::test]
3723 async fn less_filter_on_value() {
3724 let query = "some_metric < 1.2345";
3725 let expected = String::from(
3726 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3727 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3728 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3729 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3730 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3731 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3732 );
3733
3734 indie_query_plan_compare(query, expected).await;
3735 }
3736
3737 #[tokio::test]
3738 async fn count_over_time() {
3739 let query = "count_over_time(some_metric[5m])";
3740 let expected = String::from(
3741 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3742 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3743 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3744 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3745 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3746 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3747 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3748 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3749 );
3750
3751 indie_query_plan_compare(query, expected).await;
3752 }
3753
3754 #[tokio::test]
3755 async fn test_hash_join() {
3756 let mut eval_stmt = EvalStmt {
3757 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3758 start: UNIX_EPOCH,
3759 end: UNIX_EPOCH
3760 .checked_add(Duration::from_secs(100_000))
3761 .unwrap(),
3762 interval: Duration::from_secs(5),
3763 lookback_delta: Duration::from_secs(1),
3764 };
3765
3766 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3767
3768 let prom_expr = parser::parse(case).unwrap();
3769 eval_stmt.expr = prom_expr;
3770 let table_provider = build_test_table_provider_with_fields(
3771 &[
3772 (
3773 DEFAULT_SCHEMA_NAME.to_string(),
3774 "http_server_requests_seconds_sum".to_string(),
3775 ),
3776 (
3777 DEFAULT_SCHEMA_NAME.to_string(),
3778 "http_server_requests_seconds_count".to_string(),
3779 ),
3780 ],
3781 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3782 )
3783 .await;
3784 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3786 .await
3787 .unwrap();
3788 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3789 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3790 \n SubqueryAlias: http_server_requests_seconds_sum\
3791 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3792 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3793 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3794 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3795 \n TableScan: http_server_requests_seconds_sum\
3796 \n SubqueryAlias: http_server_requests_seconds_count\
3797 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3798 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3799 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3800 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3801 \n TableScan: http_server_requests_seconds_count";
3802 assert_eq!(plan.to_string(), expected);
3803 }
3804
3805 #[tokio::test]
3806 async fn test_nested_histogram_quantile() {
3807 let mut eval_stmt = EvalStmt {
3808 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3809 start: UNIX_EPOCH,
3810 end: UNIX_EPOCH
3811 .checked_add(Duration::from_secs(100_000))
3812 .unwrap(),
3813 interval: Duration::from_secs(5),
3814 lookback_delta: Duration::from_secs(1),
3815 };
3816
3817 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3818
3819 let prom_expr = parser::parse(case).unwrap();
3820 eval_stmt.expr = prom_expr;
3821 let table_provider = build_test_table_provider_with_fields(
3822 &[(
3823 DEFAULT_SCHEMA_NAME.to_string(),
3824 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3825 )],
3826 &["pod", "le", "path", "code", "container"],
3827 )
3828 .await;
3829 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3831 .await
3832 .unwrap();
3833 }
3834
3835 #[tokio::test]
3836 async fn test_parse_and_operator() {
3837 let mut eval_stmt = EvalStmt {
3838 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3839 start: UNIX_EPOCH,
3840 end: UNIX_EPOCH
3841 .checked_add(Duration::from_secs(100_000))
3842 .unwrap(),
3843 interval: Duration::from_secs(5),
3844 lookback_delta: Duration::from_secs(1),
3845 };
3846
3847 let cases = [
3848 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3849 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3850 ];
3851
3852 for case in cases {
3853 let prom_expr = parser::parse(case).unwrap();
3854 eval_stmt.expr = prom_expr;
3855 let table_provider = build_test_table_provider_with_fields(
3856 &[
3857 (
3858 DEFAULT_SCHEMA_NAME.to_string(),
3859 "kubelet_volume_stats_used_bytes".to_string(),
3860 ),
3861 (
3862 DEFAULT_SCHEMA_NAME.to_string(),
3863 "kubelet_volume_stats_capacity_bytes".to_string(),
3864 ),
3865 ],
3866 &["namespace", "persistentvolumeclaim"],
3867 )
3868 .await;
3869 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3871 .await
3872 .unwrap();
3873 }
3874 }
3875
3876 #[tokio::test]
3877 async fn test_nested_binary_op() {
3878 let mut eval_stmt = EvalStmt {
3879 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3880 start: UNIX_EPOCH,
3881 end: UNIX_EPOCH
3882 .checked_add(Duration::from_secs(100_000))
3883 .unwrap(),
3884 interval: Duration::from_secs(5),
3885 lookback_delta: Duration::from_secs(1),
3886 };
3887
3888 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3889 (
3890 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3891 or
3892 vector(0)
3893 )"#;
3894
3895 let prom_expr = parser::parse(case).unwrap();
3896 eval_stmt.expr = prom_expr;
3897 let table_provider = build_test_table_provider_with_fields(
3898 &[(
3899 DEFAULT_SCHEMA_NAME.to_string(),
3900 "nginx_ingress_controller_requests".to_string(),
3901 )],
3902 &["namespace", "job"],
3903 )
3904 .await;
3905 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3907 .await
3908 .unwrap();
3909 }
3910
3911 #[tokio::test]
3912 async fn test_parse_or_operator() {
3913 let mut eval_stmt = EvalStmt {
3914 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3915 start: UNIX_EPOCH,
3916 end: UNIX_EPOCH
3917 .checked_add(Duration::from_secs(100_000))
3918 .unwrap(),
3919 interval: Duration::from_secs(5),
3920 lookback_delta: Duration::from_secs(1),
3921 };
3922
3923 let case = r#"
3924 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
3925 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3926 or
3927 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
3928 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3929
3930 let table_provider = build_test_table_provider_with_fields(
3931 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3932 &["tenant_name", "cluster_name"],
3933 )
3934 .await;
3935 eval_stmt.expr = parser::parse(case).unwrap();
3936 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3937 .await
3938 .unwrap();
3939
3940 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3941 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3942 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3943 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3944 or
3945 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3946 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3947 or
3948 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3949 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3950 let table_provider = build_test_table_provider_with_fields(
3951 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3952 &["tenant_name", "cluster_name"],
3953 )
3954 .await;
3955 eval_stmt.expr = parser::parse(case).unwrap();
3956 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3957 .await
3958 .unwrap();
3959
3960 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
3961 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3962 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3963 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3964 let table_provider = build_test_table_provider_with_fields(
3965 &[
3966 (
3967 DEFAULT_SCHEMA_NAME.to_string(),
3968 "background_waitevent_cnt".to_string(),
3969 ),
3970 (
3971 DEFAULT_SCHEMA_NAME.to_string(),
3972 "foreground_waitevent_cnt".to_string(),
3973 ),
3974 ],
3975 &["tenant_name", "cluster_name"],
3976 )
3977 .await;
3978 eval_stmt.expr = parser::parse(case).unwrap();
3979 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3980 .await
3981 .unwrap();
3982
3983 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3984 let table_provider = build_test_table_provider_with_fields(
3985 &[
3986 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3987 (
3988 DEFAULT_SCHEMA_NAME.to_string(),
3989 "container_cpu_load_average_10s".to_string(),
3990 ),
3991 (
3992 DEFAULT_SCHEMA_NAME.to_string(),
3993 "container_spec_cpu_quota".to_string(),
3994 ),
3995 ],
3996 &["cluster_name", "host_name"],
3997 )
3998 .await;
3999 eval_stmt.expr = parser::parse(case).unwrap();
4000 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4001 .await
4002 .unwrap();
4003 }
4004
4005 #[tokio::test]
4006 async fn value_matcher() {
4007 let mut eval_stmt = EvalStmt {
4009 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4010 start: UNIX_EPOCH,
4011 end: UNIX_EPOCH
4012 .checked_add(Duration::from_secs(100_000))
4013 .unwrap(),
4014 interval: Duration::from_secs(5),
4015 lookback_delta: Duration::from_secs(1),
4016 };
4017
4018 let cases = [
4019 (
4021 r#"some_metric{__field__="field_1"}"#,
4022 vec![
4023 "some_metric.field_1",
4024 "some_metric.tag_0",
4025 "some_metric.tag_1",
4026 "some_metric.tag_2",
4027 "some_metric.timestamp",
4028 ],
4029 ),
4030 (
4032 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4033 vec![
4034 "some_metric.field_0",
4035 "some_metric.field_1",
4036 "some_metric.tag_0",
4037 "some_metric.tag_1",
4038 "some_metric.tag_2",
4039 "some_metric.timestamp",
4040 ],
4041 ),
4042 (
4044 r#"some_metric{__field__!="field_1"}"#,
4045 vec![
4046 "some_metric.field_0",
4047 "some_metric.field_2",
4048 "some_metric.tag_0",
4049 "some_metric.tag_1",
4050 "some_metric.tag_2",
4051 "some_metric.timestamp",
4052 ],
4053 ),
4054 (
4056 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4057 vec![
4058 "some_metric.field_0",
4059 "some_metric.tag_0",
4060 "some_metric.tag_1",
4061 "some_metric.tag_2",
4062 "some_metric.timestamp",
4063 ],
4064 ),
4065 (
4067 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4068 vec![
4069 "some_metric.field_1",
4070 "some_metric.tag_0",
4071 "some_metric.tag_1",
4072 "some_metric.tag_2",
4073 "some_metric.timestamp",
4074 ],
4075 ),
4076 (
4078 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4079 vec![
4080 "some_metric.tag_0",
4081 "some_metric.tag_1",
4082 "some_metric.tag_2",
4083 "some_metric.timestamp",
4084 ],
4085 ),
4086 (
4088 r#"some_metric{__field__=~"field_1|field_2"}"#,
4089 vec![
4090 "some_metric.field_1",
4091 "some_metric.field_2",
4092 "some_metric.tag_0",
4093 "some_metric.tag_1",
4094 "some_metric.tag_2",
4095 "some_metric.timestamp",
4096 ],
4097 ),
4098 (
4100 r#"some_metric{__field__!~"field_1|field_2"}"#,
4101 vec![
4102 "some_metric.field_0",
4103 "some_metric.tag_0",
4104 "some_metric.tag_1",
4105 "some_metric.tag_2",
4106 "some_metric.timestamp",
4107 ],
4108 ),
4109 ];
4110
4111 for case in cases {
4112 let prom_expr = parser::parse(case.0).unwrap();
4113 eval_stmt.expr = prom_expr;
4114 let table_provider = build_test_table_provider(
4115 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4116 3,
4117 3,
4118 )
4119 .await;
4120 let plan =
4121 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4122 .await
4123 .unwrap();
4124 let mut fields = plan.schema().field_names();
4125 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4126 fields.sort();
4127 expected.sort();
4128 assert_eq!(fields, expected, "case: {:?}", case.0);
4129 }
4130
4131 let bad_cases = [
4132 r#"some_metric{__field__="nonexistent"}"#,
4133 r#"some_metric{__field__!="nonexistent"}"#,
4134 ];
4135
4136 for case in bad_cases {
4137 let prom_expr = parser::parse(case).unwrap();
4138 eval_stmt.expr = prom_expr;
4139 let table_provider = build_test_table_provider(
4140 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4141 3,
4142 3,
4143 )
4144 .await;
4145 let plan =
4146 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4147 assert!(plan.is_err(), "case: {:?}", case);
4148 }
4149 }
4150
4151 #[tokio::test]
4152 async fn custom_schema() {
4153 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4154 let expected = String::from(
4155 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4156 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4157 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4158 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4159 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4160 );
4161
4162 indie_query_plan_compare(query, expected).await;
4163
4164 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4165 let expected = String::from(
4166 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4167 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4168 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4169 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4170 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4171 );
4172
4173 indie_query_plan_compare(query, expected).await;
4174
4175 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4176 let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4177 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4178 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4179 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4180 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4181 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4182 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4183 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4184 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4185 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4186 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4187 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4188 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4189 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4190
4191 indie_query_plan_compare(query, expected).await;
4192 }
4193
4194 #[tokio::test]
4195 async fn only_equals_is_supported_for_special_matcher() {
4196 let queries = &[
4197 "some_alt_metric{__schema__!=\"greptime_private\"}",
4198 "some_alt_metric{__schema__=~\"lalala\"}",
4199 "some_alt_metric{__database__!=\"greptime_private\"}",
4200 "some_alt_metric{__database__=~\"lalala\"}",
4201 ];
4202
4203 for query in queries {
4204 let prom_expr = parser::parse(query).unwrap();
4205 let eval_stmt = EvalStmt {
4206 expr: prom_expr,
4207 start: UNIX_EPOCH,
4208 end: UNIX_EPOCH
4209 .checked_add(Duration::from_secs(100_000))
4210 .unwrap(),
4211 interval: Duration::from_secs(5),
4212 lookback_delta: Duration::from_secs(1),
4213 };
4214
4215 let table_provider = build_test_table_provider(
4216 &[
4217 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4218 (
4219 "greptime_private".to_string(),
4220 "some_alt_metric".to_string(),
4221 ),
4222 ],
4223 1,
4224 1,
4225 )
4226 .await;
4227
4228 let plan =
4229 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4230 assert!(plan.is_err(), "query: {:?}", query);
4231 }
4232 }
4233
4234 #[tokio::test]
4235 async fn test_non_ms_precision() {
4236 let catalog_list = MemoryCatalogManager::with_default_setup();
4237 let columns = vec![
4238 ColumnSchema::new(
4239 "tag".to_string(),
4240 ConcreteDataType::string_datatype(),
4241 false,
4242 ),
4243 ColumnSchema::new(
4244 "timestamp".to_string(),
4245 ConcreteDataType::timestamp_nanosecond_datatype(),
4246 false,
4247 )
4248 .with_time_index(true),
4249 ColumnSchema::new(
4250 "field".to_string(),
4251 ConcreteDataType::float64_datatype(),
4252 true,
4253 ),
4254 ];
4255 let schema = Arc::new(Schema::new(columns));
4256 let table_meta = TableMetaBuilder::empty()
4257 .schema(schema)
4258 .primary_key_indices(vec![0])
4259 .value_indices(vec![2])
4260 .next_column_id(1024)
4261 .build()
4262 .unwrap();
4263 let table_info = TableInfoBuilder::default()
4264 .name("metrics".to_string())
4265 .meta(table_meta)
4266 .build()
4267 .unwrap();
4268 let table = EmptyTable::from_table_info(&table_info);
4269 assert!(catalog_list
4270 .register_table_sync(RegisterTableRequest {
4271 catalog: DEFAULT_CATALOG_NAME.to_string(),
4272 schema: DEFAULT_SCHEMA_NAME.to_string(),
4273 table_name: "metrics".to_string(),
4274 table_id: 1024,
4275 table,
4276 })
4277 .is_ok());
4278
4279 let plan = PromPlanner::stmt_to_plan(
4280 DfTableSourceProvider::new(
4281 catalog_list.clone(),
4282 false,
4283 QueryContext::arc(),
4284 DummyDecoder::arc(),
4285 true,
4286 ),
4287 &EvalStmt {
4288 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4289 start: UNIX_EPOCH,
4290 end: UNIX_EPOCH
4291 .checked_add(Duration::from_secs(100_000))
4292 .unwrap(),
4293 interval: Duration::from_secs(5),
4294 lookback_delta: Duration::from_secs(1),
4295 },
4296 &build_session_state(),
4297 )
4298 .await
4299 .unwrap();
4300 assert_eq!(plan.display_indent_schema().to_string(),
4301 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4302 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4303 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4304 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4305 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4306 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4307 );
4308 let plan = PromPlanner::stmt_to_plan(
4309 DfTableSourceProvider::new(
4310 catalog_list.clone(),
4311 false,
4312 QueryContext::arc(),
4313 DummyDecoder::arc(),
4314 true,
4315 ),
4316 &EvalStmt {
4317 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4318 start: UNIX_EPOCH,
4319 end: UNIX_EPOCH
4320 .checked_add(Duration::from_secs(100_000))
4321 .unwrap(),
4322 interval: Duration::from_secs(5),
4323 lookback_delta: Duration::from_secs(1),
4324 },
4325 &build_session_state(),
4326 )
4327 .await
4328 .unwrap();
4329 assert_eq!(plan.display_indent_schema().to_string(),
4330 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4331 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4332 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4333 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4334 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4335 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4336 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4337 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4338 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4339 );
4340 }
4341
4342 #[tokio::test]
4343 async fn test_nonexistent_label() {
4344 let mut eval_stmt = EvalStmt {
4346 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4347 start: UNIX_EPOCH,
4348 end: UNIX_EPOCH
4349 .checked_add(Duration::from_secs(100_000))
4350 .unwrap(),
4351 interval: Duration::from_secs(5),
4352 lookback_delta: Duration::from_secs(1),
4353 };
4354
4355 let case = r#"some_metric{nonexistent="hi"}"#;
4356 let prom_expr = parser::parse(case).unwrap();
4357 eval_stmt.expr = prom_expr;
4358 let table_provider = build_test_table_provider(
4359 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4360 3,
4361 3,
4362 )
4363 .await;
4364 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4366 .await
4367 .unwrap();
4368 }
4369
4370 #[tokio::test]
4371 async fn test_label_join() {
4372 let prom_expr = parser::parse(
4373 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4374 )
4375 .unwrap();
4376 let eval_stmt = EvalStmt {
4377 expr: prom_expr,
4378 start: UNIX_EPOCH,
4379 end: UNIX_EPOCH
4380 .checked_add(Duration::from_secs(100_000))
4381 .unwrap(),
4382 interval: Duration::from_secs(5),
4383 lookback_delta: Duration::from_secs(1),
4384 };
4385
4386 let table_provider =
4387 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4388 .await;
4389 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4390 .await
4391 .unwrap();
4392
4393 let expected = r#"
4394Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4395 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4396 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4397 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4398 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4399 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4400 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4401
4402 let ret = plan.display_indent_schema().to_string();
4403 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4404 }
4405
4406 #[tokio::test]
4407 async fn test_label_replace() {
4408 let prom_expr = parser::parse(
4409 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4410 )
4411 .unwrap();
4412 let eval_stmt = EvalStmt {
4413 expr: prom_expr,
4414 start: UNIX_EPOCH,
4415 end: UNIX_EPOCH
4416 .checked_add(Duration::from_secs(100_000))
4417 .unwrap(),
4418 interval: Duration::from_secs(5),
4419 lookback_delta: Duration::from_secs(1),
4420 };
4421
4422 let table_provider =
4423 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4424 .await;
4425 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4426 .await
4427 .unwrap();
4428
4429 let expected = r#"
4430Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4431 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4432 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4433 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4434 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4435 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4436 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4437
4438 let ret = plan.display_indent_schema().to_string();
4439 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4440 }
4441
4442 #[tokio::test]
4443 async fn test_matchers_to_expr() {
4444 let mut eval_stmt = EvalStmt {
4445 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4446 start: UNIX_EPOCH,
4447 end: UNIX_EPOCH
4448 .checked_add(Duration::from_secs(100_000))
4449 .unwrap(),
4450 interval: Duration::from_secs(5),
4451 lookback_delta: Duration::from_secs(1),
4452 };
4453 let case =
4454 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4455
4456 let prom_expr = parser::parse(case).unwrap();
4457 eval_stmt.expr = prom_expr;
4458 let table_provider = build_test_table_provider(
4459 &[(
4460 DEFAULT_SCHEMA_NAME.to_string(),
4461 "prometheus_tsdb_head_series".to_string(),
4462 )],
4463 3,
4464 3,
4465 )
4466 .await;
4467 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4468 .await
4469 .unwrap();
4470 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4471 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4472 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4473 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4474 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4475 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4476 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4477 assert_eq!(plan.display_indent_schema().to_string(), expected);
4478 }
4479
4480 #[tokio::test]
4481 async fn test_topk_expr() {
4482 let mut eval_stmt = EvalStmt {
4483 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4484 start: UNIX_EPOCH,
4485 end: UNIX_EPOCH
4486 .checked_add(Duration::from_secs(100_000))
4487 .unwrap(),
4488 interval: Duration::from_secs(5),
4489 lookback_delta: Duration::from_secs(1),
4490 };
4491 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4492
4493 let prom_expr = parser::parse(case).unwrap();
4494 eval_stmt.expr = prom_expr;
4495 let table_provider = build_test_table_provider_with_fields(
4496 &[
4497 (
4498 DEFAULT_SCHEMA_NAME.to_string(),
4499 "prometheus_tsdb_head_series".to_string(),
4500 ),
4501 (
4502 DEFAULT_SCHEMA_NAME.to_string(),
4503 "http_server_requests_seconds_count".to_string(),
4504 ),
4505 ],
4506 &["ip"],
4507 )
4508 .await;
4509
4510 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4511 .await
4512 .unwrap();
4513 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4514 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4515 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4516 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4517 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4518 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4519 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4520 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4521 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4522 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4523 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4524
4525 assert_eq!(plan.display_indent_schema().to_string(), expected);
4526 }
4527
4528 #[tokio::test]
4529 async fn test_count_values_expr() {
4530 let mut eval_stmt = EvalStmt {
4531 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4532 start: UNIX_EPOCH,
4533 end: UNIX_EPOCH
4534 .checked_add(Duration::from_secs(100_000))
4535 .unwrap(),
4536 interval: Duration::from_secs(5),
4537 lookback_delta: Duration::from_secs(1),
4538 };
4539 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4540
4541 let prom_expr = parser::parse(case).unwrap();
4542 eval_stmt.expr = prom_expr;
4543 let table_provider = build_test_table_provider_with_fields(
4544 &[
4545 (
4546 DEFAULT_SCHEMA_NAME.to_string(),
4547 "prometheus_tsdb_head_series".to_string(),
4548 ),
4549 (
4550 DEFAULT_SCHEMA_NAME.to_string(),
4551 "http_server_requests_seconds_count".to_string(),
4552 ),
4553 ],
4554 &["ip"],
4555 )
4556 .await;
4557
4558 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4559 .await
4560 .unwrap();
4561 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4562 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4563 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4564 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4565 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4566 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4567 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4568 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4569 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4570
4571 assert_eq!(plan.display_indent_schema().to_string(), expected);
4572 }
4573
4574 #[tokio::test]
4575 async fn test_quantile_expr() {
4576 let mut eval_stmt = EvalStmt {
4577 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4578 start: UNIX_EPOCH,
4579 end: UNIX_EPOCH
4580 .checked_add(Duration::from_secs(100_000))
4581 .unwrap(),
4582 interval: Duration::from_secs(5),
4583 lookback_delta: Duration::from_secs(1),
4584 };
4585 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4586
4587 let prom_expr = parser::parse(case).unwrap();
4588 eval_stmt.expr = prom_expr;
4589 let table_provider = build_test_table_provider_with_fields(
4590 &[
4591 (
4592 DEFAULT_SCHEMA_NAME.to_string(),
4593 "prometheus_tsdb_head_series".to_string(),
4594 ),
4595 (
4596 DEFAULT_SCHEMA_NAME.to_string(),
4597 "http_server_requests_seconds_count".to_string(),
4598 ),
4599 ],
4600 &["ip"],
4601 )
4602 .await;
4603
4604 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4605 .await
4606 .unwrap();
4607 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4608 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4609 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4610 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4611 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4612 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4613 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4614 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4615 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4616
4617 assert_eq!(plan.display_indent_schema().to_string(), expected);
4618 }
4619
4620 #[tokio::test]
4621 async fn test_or_not_exists_table_label() {
4622 let mut eval_stmt = EvalStmt {
4623 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4624 start: UNIX_EPOCH,
4625 end: UNIX_EPOCH
4626 .checked_add(Duration::from_secs(100_000))
4627 .unwrap(),
4628 interval: Duration::from_secs(5),
4629 lookback_delta: Duration::from_secs(1),
4630 };
4631 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4632
4633 let prom_expr = parser::parse(case).unwrap();
4634 eval_stmt.expr = prom_expr;
4635 let table_provider = build_test_table_provider_with_fields(
4636 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4637 &["job"],
4638 )
4639 .await;
4640
4641 let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4642 .await
4643 .unwrap();
4644 let expected = "UnionDistinctOn: on col=[[\"job\"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4645 \n SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4646 \n Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4647 \n Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4648 \n Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4649 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4650 \n PromSeriesDivide: tags=[\"job\"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4651 \n Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4652 \n Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4653 \n TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4654 \n SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4655 \n Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4656 \n Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4657 \n Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4658 \n EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]";
4659
4660 assert_eq!(plan.display_indent_schema().to_string(), expected);
4661 }
4662}