1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::GREPTIME_VALUE;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{col, lit, ExprSchemable, Literal, SortExpr};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56 build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
57 RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
58};
59use promql::functions::{
60 quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
61 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63};
64use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
65use promql_parser::parser::token::TokenType;
66use promql_parser::parser::{
67 token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
68 Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
69 NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
70 VectorMatchCardinality, VectorSelector,
71};
72use regex::{self, Regex};
73use snafu::{ensure, OptionExt, ResultExt};
74use store_api::metric_engine_consts::{
75 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
76};
77use table::table::adapter::DfTableProviderAdapter;
78
79use crate::promql::error::{
80 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
81 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
82 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
83 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
84 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
85 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
86 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
87 ZeroRangeSelectorSnafu,
88};
89use crate::query_engine::QueryEngineState;
90
91const SPECIAL_TIME_FUNCTION: &str = "time";
93const SCALAR_FUNCTION: &str = "scalar";
95const SPECIAL_ABSENT_FUNCTION: &str = "absent";
97const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
99const SPECIAL_VECTOR_FUNCTION: &str = "vector";
101const LE_COLUMN_NAME: &str = "le";
103
104static LABEL_NAME_REGEX: Lazy<Regex> =
107 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
108
109const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
110
111const DEFAULT_FIELD_COLUMN: &str = "value";
113
114const FIELD_COLUMN_MATCHER: &str = "__field__";
116
117const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
119const DB_COLUMN_MATCHER: &str = "__database__";
120
121const MAX_SCATTER_POINTS: i64 = 400;
123
124const INTERVAL_1H: i64 = 60 * 60 * 1000;
126
127#[derive(Default, Debug, Clone)]
128struct PromPlannerContext {
129 start: Millisecond,
131 end: Millisecond,
132 interval: Millisecond,
133 lookback_delta: Millisecond,
134
135 table_name: Option<String>,
137 time_index_column: Option<String>,
138 field_columns: Vec<String>,
139 tag_columns: Vec<String>,
140 field_column_matcher: Option<Vec<Matcher>>,
142 selector_matcher: Vec<Matcher>,
144 schema_name: Option<String>,
145 range: Option<Millisecond>,
147}
148
149impl PromPlannerContext {
150 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
151 Self {
152 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
153 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154 interval: stmt.interval.as_millis() as _,
155 lookback_delta: stmt.lookback_delta.as_millis() as _,
156 ..Default::default()
157 }
158 }
159
160 fn reset(&mut self) {
162 self.table_name = None;
163 self.time_index_column = None;
164 self.field_columns = vec![];
165 self.tag_columns = vec![];
166 self.field_column_matcher = None;
167 self.selector_matcher.clear();
168 self.schema_name = None;
169 self.range = None;
170 }
171
172 fn reset_table_name_and_schema(&mut self) {
174 self.table_name = Some(String::new());
175 self.schema_name = None;
176 }
177
178 fn has_le_tag(&self) -> bool {
180 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
181 }
182}
183
184pub struct PromPlanner {
185 table_provider: DfTableSourceProvider,
186 ctx: PromPlannerContext,
187}
188
189pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
191 if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
192 matcher.value = unescaped_value;
193 }
194 matcher
195}
196
197impl PromPlanner {
198 pub async fn stmt_to_plan(
199 table_provider: DfTableSourceProvider,
200 stmt: &EvalStmt,
201 query_engine_state: &QueryEngineState,
202 ) -> Result<LogicalPlan> {
203 let mut planner = Self {
204 table_provider,
205 ctx: PromPlannerContext::from_eval_stmt(stmt),
206 };
207
208 planner
209 .prom_expr_to_plan(&stmt.expr, query_engine_state)
210 .await
211 }
212
213 pub async fn prom_expr_to_plan(
214 &mut self,
215 prom_expr: &PromExpr,
216 query_engine_state: &QueryEngineState,
217 ) -> Result<LogicalPlan> {
218 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
219 .await
220 }
221
222 #[async_recursion]
232 async fn prom_expr_to_plan_inner(
233 &mut self,
234 prom_expr: &PromExpr,
235 timestamp_fn: bool,
236 query_engine_state: &QueryEngineState,
237 ) -> Result<LogicalPlan> {
238 let res = match prom_expr {
239 PromExpr::Aggregate(expr) => {
240 self.prom_aggr_expr_to_plan(query_engine_state, expr)
241 .await?
242 }
243 PromExpr::Unary(expr) => {
244 self.prom_unary_expr_to_plan(query_engine_state, expr)
245 .await?
246 }
247 PromExpr::Binary(expr) => {
248 self.prom_binary_expr_to_plan(query_engine_state, expr)
249 .await?
250 }
251 PromExpr::Paren(ParenExpr { expr }) => {
252 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
253 .await?
254 }
255 PromExpr::Subquery(expr) => {
256 self.prom_subquery_expr_to_plan(query_engine_state, expr)
257 .await?
258 }
259 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
260 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
261 PromExpr::VectorSelector(selector) => {
262 self.prom_vector_selector_to_plan(selector, timestamp_fn)
263 .await?
264 }
265 PromExpr::MatrixSelector(selector) => {
266 self.prom_matrix_selector_to_plan(selector).await?
267 }
268 PromExpr::Call(expr) => {
269 self.prom_call_expr_to_plan(query_engine_state, expr)
270 .await?
271 }
272 PromExpr::Extension(expr) => {
273 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
274 }
275 };
276
277 Ok(res)
278 }
279
280 async fn prom_subquery_expr_to_plan(
281 &mut self,
282 query_engine_state: &QueryEngineState,
283 subquery_expr: &SubqueryExpr,
284 ) -> Result<LogicalPlan> {
285 let SubqueryExpr {
286 expr, range, step, ..
287 } = subquery_expr;
288
289 let current_interval = self.ctx.interval;
290 if let Some(step) = step {
291 self.ctx.interval = step.as_millis() as _;
292 }
293 let current_start = self.ctx.start;
294 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
295 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
296 self.ctx.interval = current_interval;
297 self.ctx.start = current_start;
298
299 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
300 let range_ms = range.as_millis() as _;
301 self.ctx.range = Some(range_ms);
302
303 let manipulate = RangeManipulate::new(
304 self.ctx.start,
305 self.ctx.end,
306 self.ctx.interval,
307 range_ms,
308 self.ctx
309 .time_index_column
310 .clone()
311 .expect("time index should be set in `setup_context`"),
312 self.ctx.field_columns.clone(),
313 input,
314 )
315 .context(DataFusionPlanningSnafu)?;
316
317 Ok(LogicalPlan::Extension(Extension {
318 node: Arc::new(manipulate),
319 }))
320 }
321
322 async fn prom_aggr_expr_to_plan(
323 &mut self,
324 query_engine_state: &QueryEngineState,
325 aggr_expr: &AggregateExpr,
326 ) -> Result<LogicalPlan> {
327 let AggregateExpr {
328 op,
329 expr,
330 modifier,
331 param,
332 } = aggr_expr;
333
334 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
335
336 match (*op).id() {
337 token::T_TOPK | token::T_BOTTOMK => {
338 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
339 }
340 _ => {
341 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
344 let (aggr_exprs, prev_field_exprs) =
346 self.create_aggregate_exprs(*op, param, &input)?;
347
348 let builder = LogicalPlanBuilder::from(input);
350 let builder = if op.id() == token::T_COUNT_VALUES {
351 let label = Self::get_param_value_as_str(*op, param)?;
352 group_exprs.extend(prev_field_exprs.clone());
355 let project_fields = self
356 .create_field_column_exprs()?
357 .into_iter()
358 .chain(self.create_tag_column_exprs()?)
359 .chain(Some(self.create_time_index_column_expr()?))
360 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
361
362 builder
363 .aggregate(group_exprs.clone(), aggr_exprs)
364 .context(DataFusionPlanningSnafu)?
365 .project(project_fields)
366 .context(DataFusionPlanningSnafu)?
367 } else {
368 builder
369 .aggregate(group_exprs.clone(), aggr_exprs)
370 .context(DataFusionPlanningSnafu)?
371 };
372
373 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
374
375 builder
376 .sort(sort_expr)
377 .context(DataFusionPlanningSnafu)?
378 .build()
379 .context(DataFusionPlanningSnafu)
380 }
381 }
382 }
383
384 async fn prom_topk_bottomk_to_plan(
386 &mut self,
387 aggr_expr: &AggregateExpr,
388 input: LogicalPlan,
389 ) -> Result<LogicalPlan> {
390 let AggregateExpr {
391 op,
392 param,
393 modifier,
394 ..
395 } = aggr_expr;
396
397 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
398
399 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
400
401 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
403
404 let rank_columns: Vec<_> = window_exprs
405 .iter()
406 .map(|expr| expr.schema_name().to_string())
407 .collect();
408
409 let filter: DfExpr = rank_columns
412 .iter()
413 .fold(None, |expr, rank| {
414 let predicate = DfExpr::BinaryExpr(BinaryExpr {
415 left: Box::new(col(rank)),
416 op: Operator::LtEq,
417 right: Box::new(val.clone()),
418 });
419
420 match expr {
421 None => Some(predicate),
422 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
423 left: Box::new(expr),
424 op: Operator::Or,
425 right: Box::new(predicate),
426 })),
427 }
428 })
429 .unwrap();
430
431 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
432
433 let mut new_group_exprs = group_exprs.clone();
434 new_group_exprs.extend(rank_columns);
436
437 let group_sort_expr = new_group_exprs
438 .into_iter()
439 .map(|expr| expr.sort(true, false));
440
441 let project_fields = self
442 .create_field_column_exprs()?
443 .into_iter()
444 .chain(self.create_tag_column_exprs()?)
445 .chain(Some(self.create_time_index_column_expr()?));
446
447 LogicalPlanBuilder::from(input)
448 .window(window_exprs)
449 .context(DataFusionPlanningSnafu)?
450 .filter(filter)
451 .context(DataFusionPlanningSnafu)?
452 .sort(group_sort_expr)
453 .context(DataFusionPlanningSnafu)?
454 .project(project_fields)
455 .context(DataFusionPlanningSnafu)?
456 .build()
457 .context(DataFusionPlanningSnafu)
458 }
459
460 async fn prom_unary_expr_to_plan(
461 &mut self,
462 query_engine_state: &QueryEngineState,
463 unary_expr: &UnaryExpr,
464 ) -> Result<LogicalPlan> {
465 let UnaryExpr { expr } = unary_expr;
466 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
468 self.projection_for_each_field_column(input, |col| {
469 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
470 })
471 }
472
473 async fn prom_binary_expr_to_plan(
474 &mut self,
475 query_engine_state: &QueryEngineState,
476 binary_expr: &PromBinaryExpr,
477 ) -> Result<LogicalPlan> {
478 let PromBinaryExpr {
479 lhs,
480 rhs,
481 op,
482 modifier,
483 } = binary_expr;
484
485 let should_return_bool = if let Some(m) = modifier {
488 m.return_bool
489 } else {
490 false
491 };
492 let is_comparison_op = Self::is_token_a_comparison_op(*op);
493
494 match (
497 Self::try_build_literal_expr(lhs),
498 Self::try_build_literal_expr(rhs),
499 ) {
500 (Some(lhs), Some(rhs)) => {
501 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
502 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
503 self.ctx.reset_table_name_and_schema();
504 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
505 let mut field_expr = field_expr_builder(lhs, rhs)?;
506
507 if is_comparison_op && should_return_bool {
508 field_expr = DfExpr::Cast(Cast {
509 expr: Box::new(field_expr),
510 data_type: ArrowDataType::Float64,
511 });
512 }
513
514 Ok(LogicalPlan::Extension(Extension {
515 node: Arc::new(
516 EmptyMetric::new(
517 self.ctx.start,
518 self.ctx.end,
519 self.ctx.interval,
520 SPECIAL_TIME_FUNCTION.to_string(),
521 DEFAULT_FIELD_COLUMN.to_string(),
522 Some(field_expr),
523 )
524 .context(DataFusionPlanningSnafu)?,
525 ),
526 }))
527 }
528 (Some(mut expr), None) => {
530 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
531 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
533 expr = time_expr
534 }
535 let bin_expr_builder = |col: &String| {
536 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
537 let mut binary_expr =
538 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
539
540 if is_comparison_op && should_return_bool {
541 binary_expr = DfExpr::Cast(Cast {
542 expr: Box::new(binary_expr),
543 data_type: ArrowDataType::Float64,
544 });
545 }
546 Ok(binary_expr)
547 };
548 if is_comparison_op && !should_return_bool {
549 self.filter_on_field_column(input, bin_expr_builder)
550 } else {
551 self.projection_for_each_field_column(input, bin_expr_builder)
552 }
553 }
554 (None, Some(mut expr)) => {
556 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
557 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
559 expr = time_expr
560 }
561 let bin_expr_builder = |col: &String| {
562 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
563 let mut binary_expr =
564 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
565
566 if is_comparison_op && should_return_bool {
567 binary_expr = DfExpr::Cast(Cast {
568 expr: Box::new(binary_expr),
569 data_type: ArrowDataType::Float64,
570 });
571 }
572 Ok(binary_expr)
573 };
574 if is_comparison_op && !should_return_bool {
575 self.filter_on_field_column(input, bin_expr_builder)
576 } else {
577 self.projection_for_each_field_column(input, bin_expr_builder)
578 }
579 }
580 (None, None) => {
582 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
583 let left_field_columns = self.ctx.field_columns.clone();
584 let left_time_index_column = self.ctx.time_index_column.clone();
585 let mut left_table_ref = self
586 .table_ref()
587 .unwrap_or_else(|_| TableReference::bare(""));
588 let left_context = self.ctx.clone();
589
590 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
591 let right_field_columns = self.ctx.field_columns.clone();
592 let right_time_index_column = self.ctx.time_index_column.clone();
593 let mut right_table_ref = self
594 .table_ref()
595 .unwrap_or_else(|_| TableReference::bare(""));
596 let right_context = self.ctx.clone();
597
598 if Self::is_token_a_set_op(*op) {
602 return self.set_op_on_non_field_columns(
603 left_input,
604 right_input,
605 left_context,
606 right_context,
607 *op,
608 modifier,
609 );
610 }
611
612 if left_table_ref == right_table_ref {
614 left_table_ref = TableReference::bare("lhs");
616 right_table_ref = TableReference::bare("rhs");
617 if self.ctx.tag_columns.is_empty() {
623 self.ctx = left_context.clone();
624 self.ctx.table_name = Some("lhs".to_string());
625 } else {
626 self.ctx.table_name = Some("rhs".to_string());
627 }
628 }
629 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
630
631 let join_plan = self.join_on_non_field_columns(
632 left_input,
633 right_input,
634 left_table_ref.clone(),
635 right_table_ref.clone(),
636 left_time_index_column,
637 right_time_index_column,
638 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
641 modifier,
642 )?;
643 let join_plan_schema = join_plan.schema().clone();
644
645 let bin_expr_builder = |_: &String| {
646 let (left_col_name, right_col_name) = field_columns.next().unwrap();
647 let left_col = join_plan_schema
648 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
649 .context(DataFusionPlanningSnafu)?
650 .into();
651 let right_col = join_plan_schema
652 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
653 .context(DataFusionPlanningSnafu)?
654 .into();
655
656 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
657 let mut binary_expr =
658 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
659 if is_comparison_op && should_return_bool {
660 binary_expr = DfExpr::Cast(Cast {
661 expr: Box::new(binary_expr),
662 data_type: ArrowDataType::Float64,
663 });
664 }
665 Ok(binary_expr)
666 };
667 if is_comparison_op && !should_return_bool {
668 self.filter_on_field_column(join_plan, bin_expr_builder)
669 } else {
670 self.projection_for_each_field_column(join_plan, bin_expr_builder)
671 }
672 }
673 }
674 }
675
676 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
677 let NumberLiteral { val } = number_literal;
678 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
679 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
680 self.ctx.reset_table_name_and_schema();
681 let literal_expr = df_prelude::lit(*val);
682
683 let plan = LogicalPlan::Extension(Extension {
684 node: Arc::new(
685 EmptyMetric::new(
686 self.ctx.start,
687 self.ctx.end,
688 self.ctx.interval,
689 SPECIAL_TIME_FUNCTION.to_string(),
690 DEFAULT_FIELD_COLUMN.to_string(),
691 Some(literal_expr),
692 )
693 .context(DataFusionPlanningSnafu)?,
694 ),
695 });
696 Ok(plan)
697 }
698
699 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
700 let StringLiteral { val } = string_literal;
701 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
702 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
703 self.ctx.reset_table_name_and_schema();
704 let literal_expr = df_prelude::lit(val.to_string());
705
706 let plan = LogicalPlan::Extension(Extension {
707 node: Arc::new(
708 EmptyMetric::new(
709 self.ctx.start,
710 self.ctx.end,
711 self.ctx.interval,
712 SPECIAL_TIME_FUNCTION.to_string(),
713 DEFAULT_FIELD_COLUMN.to_string(),
714 Some(literal_expr),
715 )
716 .context(DataFusionPlanningSnafu)?,
717 ),
718 });
719 Ok(plan)
720 }
721
722 async fn prom_vector_selector_to_plan(
723 &mut self,
724 vector_selector: &VectorSelector,
725 timestamp_fn: bool,
726 ) -> Result<LogicalPlan> {
727 let VectorSelector {
728 name,
729 offset,
730 matchers,
731 at: _,
732 } = vector_selector;
733 let matchers = self.preprocess_label_matchers(matchers, name)?;
734 if let Some(empty_plan) = self.setup_context().await? {
735 return Ok(empty_plan);
736 }
737 let normalize = self
738 .selector_to_series_normalize_plan(offset, matchers, false)
739 .await?;
740
741 let normalize = if timestamp_fn {
742 self.create_timestamp_func_plan(normalize)?
745 } else {
746 normalize
747 };
748
749 let manipulate = InstantManipulate::new(
750 self.ctx.start,
751 self.ctx.end,
752 self.ctx.lookback_delta,
753 self.ctx.interval,
754 self.ctx
755 .time_index_column
756 .clone()
757 .expect("time index should be set in `setup_context`"),
758 self.ctx.field_columns.first().cloned(),
759 normalize,
760 );
761 Ok(LogicalPlan::Extension(Extension {
762 node: Arc::new(manipulate),
763 }))
764 }
765
766 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
788 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
789 .alias(DEFAULT_FIELD_COLUMN);
790 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
791 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
792 project_exprs.push(self.create_time_index_column_expr()?);
793 project_exprs.push(time_expr);
794 project_exprs.extend(self.create_tag_column_exprs()?);
795
796 LogicalPlanBuilder::from(normalize)
797 .project(project_exprs)
798 .context(DataFusionPlanningSnafu)?
799 .build()
800 .context(DataFusionPlanningSnafu)
801 }
802
803 async fn prom_matrix_selector_to_plan(
804 &mut self,
805 matrix_selector: &MatrixSelector,
806 ) -> Result<LogicalPlan> {
807 let MatrixSelector { vs, range } = matrix_selector;
808 let VectorSelector {
809 name,
810 offset,
811 matchers,
812 ..
813 } = vs;
814 let matchers = self.preprocess_label_matchers(matchers, name)?;
815 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
816 let range_ms = range.as_millis() as _;
817 self.ctx.range = Some(range_ms);
818
819 let normalize = match self.setup_context().await? {
822 Some(empty_plan) => empty_plan,
823 None => {
824 self.selector_to_series_normalize_plan(offset, matchers, true)
825 .await?
826 }
827 };
828 let manipulate = RangeManipulate::new(
829 self.ctx.start,
830 self.ctx.end,
831 self.ctx.interval,
832 range_ms,
834 self.ctx
835 .time_index_column
836 .clone()
837 .expect("time index should be set in `setup_context`"),
838 self.ctx.field_columns.clone(),
839 normalize,
840 )
841 .context(DataFusionPlanningSnafu)?;
842
843 Ok(LogicalPlan::Extension(Extension {
844 node: Arc::new(manipulate),
845 }))
846 }
847
848 async fn prom_call_expr_to_plan(
849 &mut self,
850 query_engine_state: &QueryEngineState,
851 call_expr: &Call,
852 ) -> Result<LogicalPlan> {
853 let Call { func, args } = call_expr;
854 match func.name {
856 SPECIAL_HISTOGRAM_QUANTILE => {
857 return self.create_histogram_plan(args, query_engine_state).await
858 }
859 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
860 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
861 SPECIAL_ABSENT_FUNCTION => {
862 return self.create_absent_plan(args, query_engine_state).await
863 }
864 _ => {}
865 }
866
867 let args = self.create_function_args(&args.args)?;
869 let input = if let Some(prom_expr) = &args.input {
870 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
871 .await?
872 } else {
873 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
874 self.ctx.reset_table_name_and_schema();
875 self.ctx.tag_columns = vec![];
876 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
877 LogicalPlan::Extension(Extension {
878 node: Arc::new(
879 EmptyMetric::new(
880 self.ctx.start,
881 self.ctx.end,
882 self.ctx.interval,
883 SPECIAL_TIME_FUNCTION.to_string(),
884 DEFAULT_FIELD_COLUMN.to_string(),
885 None,
886 )
887 .context(DataFusionPlanningSnafu)?,
888 ),
889 })
890 };
891 let (mut func_exprs, new_tags) =
892 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
893 func_exprs.insert(0, self.create_time_index_column_expr()?);
894 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
895
896 let builder = LogicalPlanBuilder::from(input)
897 .project(func_exprs)
898 .context(DataFusionPlanningSnafu)?
899 .filter(self.create_empty_values_filter_expr()?)
900 .context(DataFusionPlanningSnafu)?;
901
902 let builder = match func.name {
903 "sort" => builder
904 .sort(self.create_field_columns_sort_exprs(true))
905 .context(DataFusionPlanningSnafu)?,
906 "sort_desc" => builder
907 .sort(self.create_field_columns_sort_exprs(false))
908 .context(DataFusionPlanningSnafu)?,
909 "sort_by_label" => builder
910 .sort(Self::create_sort_exprs_by_tags(
911 func.name,
912 args.literals,
913 true,
914 )?)
915 .context(DataFusionPlanningSnafu)?,
916 "sort_by_label_desc" => builder
917 .sort(Self::create_sort_exprs_by_tags(
918 func.name,
919 args.literals,
920 false,
921 )?)
922 .context(DataFusionPlanningSnafu)?,
923
924 _ => builder,
925 };
926
927 for tag in new_tags {
930 self.ctx.tag_columns.push(tag);
931 }
932
933 let plan = builder.build().context(DataFusionPlanningSnafu)?;
934 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
935
936 Ok(plan)
937 }
938
939 async fn prom_ext_expr_to_plan(
940 &mut self,
941 query_engine_state: &QueryEngineState,
942 ext_expr: &promql_parser::parser::ast::Extension,
943 ) -> Result<LogicalPlan> {
944 let expr = &ext_expr.expr;
946 let children = expr.children();
947 let plan = self
948 .prom_expr_to_plan(&children[0], query_engine_state)
949 .await?;
950 match expr.name() {
956 "ANALYZE" => LogicalPlanBuilder::from(plan)
957 .explain(false, true)
958 .unwrap()
959 .build()
960 .context(DataFusionPlanningSnafu),
961 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
962 .explain(true, true)
963 .unwrap()
964 .build()
965 .context(DataFusionPlanningSnafu),
966 "EXPLAIN" => LogicalPlanBuilder::from(plan)
967 .explain(false, false)
968 .unwrap()
969 .build()
970 .context(DataFusionPlanningSnafu),
971 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
972 .explain(true, false)
973 .unwrap()
974 .build()
975 .context(DataFusionPlanningSnafu),
976 _ => LogicalPlanBuilder::empty(true)
977 .build()
978 .context(DataFusionPlanningSnafu),
979 }
980 }
981
982 #[allow(clippy::mutable_key_type)]
992 fn preprocess_label_matchers(
993 &mut self,
994 label_matchers: &Matchers,
995 name: &Option<String>,
996 ) -> Result<Matchers> {
997 self.ctx.reset();
998
999 let metric_name;
1000 if let Some(name) = name.clone() {
1001 metric_name = Some(name);
1002 ensure!(
1003 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1004 MultipleMetricMatchersSnafu
1005 );
1006 } else {
1007 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1008 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1009 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1010 ensure!(
1011 matches[0].op == MatchOp::Equal,
1012 UnsupportedMatcherOpSnafu {
1013 matcher_op: matches[0].op.to_string(),
1014 matcher: METRIC_NAME
1015 }
1016 );
1017 metric_name = matches.pop().map(|m| m.value);
1018 }
1019
1020 self.ctx.table_name = metric_name;
1021
1022 let mut matchers = HashSet::new();
1023 for matcher in &label_matchers.matchers {
1024 if matcher.name == FIELD_COLUMN_MATCHER {
1026 self.ctx
1027 .field_column_matcher
1028 .get_or_insert_default()
1029 .push(matcher.clone());
1030 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1031 ensure!(
1032 matcher.op == MatchOp::Equal,
1033 UnsupportedMatcherOpSnafu {
1034 matcher: matcher.name.to_string(),
1035 matcher_op: matcher.op.to_string(),
1036 }
1037 );
1038 self.ctx.schema_name = Some(matcher.value.clone());
1039 } else if matcher.name != METRIC_NAME {
1040 self.ctx.selector_matcher.push(matcher.clone());
1041 let _ = matchers.insert(matcher.clone());
1042 }
1043 }
1044
1045 Ok(Matchers::new(
1046 matchers.into_iter().map(normalize_matcher).collect(),
1047 ))
1048 }
1049
1050 async fn selector_to_series_normalize_plan(
1051 &mut self,
1052 offset: &Option<Offset>,
1053 label_matchers: Matchers,
1054 is_range_selector: bool,
1055 ) -> Result<LogicalPlan> {
1056 let table_ref = self.table_ref()?;
1058 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1059 let table_schema = table_scan.schema();
1060
1061 let offset_duration = match offset {
1063 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1064 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1065 None => 0,
1066 };
1067 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1068 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1069 scan_filters.push(time_index_filter);
1070 }
1071 table_scan = LogicalPlanBuilder::from(table_scan)
1072 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1074 .build()
1075 .context(DataFusionPlanningSnafu)?;
1076
1077 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1079 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1080 let mut result_set = HashSet::new();
1082 let mut reverse_set = HashSet::new();
1084 for matcher in field_matchers {
1085 match &matcher.op {
1086 MatchOp::Equal => {
1087 if col_set.contains(&matcher.value) {
1088 let _ = result_set.insert(matcher.value.clone());
1089 } else {
1090 return Err(ColumnNotFoundSnafu {
1091 col: matcher.value.clone(),
1092 }
1093 .build());
1094 }
1095 }
1096 MatchOp::NotEqual => {
1097 if col_set.contains(&matcher.value) {
1098 let _ = reverse_set.insert(matcher.value.clone());
1099 } else {
1100 return Err(ColumnNotFoundSnafu {
1101 col: matcher.value.clone(),
1102 }
1103 .build());
1104 }
1105 }
1106 MatchOp::Re(regex) => {
1107 for col in &self.ctx.field_columns {
1108 if regex.is_match(col) {
1109 let _ = result_set.insert(col.clone());
1110 }
1111 }
1112 }
1113 MatchOp::NotRe(regex) => {
1114 for col in &self.ctx.field_columns {
1115 if regex.is_match(col) {
1116 let _ = reverse_set.insert(col.clone());
1117 }
1118 }
1119 }
1120 }
1121 }
1122 if result_set.is_empty() {
1124 result_set = col_set.into_iter().cloned().collect();
1125 }
1126 for col in reverse_set {
1127 let _ = result_set.remove(&col);
1128 }
1129
1130 self.ctx.field_columns = self
1132 .ctx
1133 .field_columns
1134 .drain(..)
1135 .filter(|col| result_set.contains(col))
1136 .collect();
1137
1138 let exprs = result_set
1139 .into_iter()
1140 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1141 .chain(self.create_tag_column_exprs()?)
1142 .chain(Some(self.create_time_index_column_expr()?))
1143 .collect::<Vec<_>>();
1144
1145 table_scan = LogicalPlanBuilder::from(table_scan)
1147 .project(exprs)
1148 .context(DataFusionPlanningSnafu)?
1149 .build()
1150 .context(DataFusionPlanningSnafu)?;
1151 }
1152
1153 let sort_plan = LogicalPlanBuilder::from(table_scan)
1155 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1156 .context(DataFusionPlanningSnafu)?
1157 .build()
1158 .context(DataFusionPlanningSnafu)?;
1159
1160 let time_index_column =
1162 self.ctx
1163 .time_index_column
1164 .clone()
1165 .with_context(|| TimeIndexNotFoundSnafu {
1166 table: table_ref.to_string(),
1167 })?;
1168 let divide_plan = LogicalPlan::Extension(Extension {
1169 node: Arc::new(SeriesDivide::new(
1170 self.ctx.tag_columns.clone(),
1171 time_index_column,
1172 sort_plan,
1173 )),
1174 });
1175
1176 if !is_range_selector && offset_duration == 0 {
1178 return Ok(divide_plan);
1179 }
1180 let series_normalize = SeriesNormalize::new(
1181 offset_duration,
1182 self.ctx
1183 .time_index_column
1184 .clone()
1185 .with_context(|| TimeIndexNotFoundSnafu {
1186 table: table_ref.to_quoted_string(),
1187 })?,
1188 is_range_selector,
1189 self.ctx.tag_columns.clone(),
1190 divide_plan,
1191 );
1192 let logical_plan = LogicalPlan::Extension(Extension {
1193 node: Arc::new(series_normalize),
1194 });
1195
1196 Ok(logical_plan)
1197 }
1198
1199 fn agg_modifier_to_col(
1206 &mut self,
1207 input_schema: &DFSchemaRef,
1208 modifier: &Option<LabelModifier>,
1209 update_ctx: bool,
1210 ) -> Result<Vec<DfExpr>> {
1211 match modifier {
1212 None => {
1213 if update_ctx {
1214 self.ctx.tag_columns.clear();
1215 }
1216 Ok(vec![self.create_time_index_column_expr()?])
1217 }
1218 Some(LabelModifier::Include(labels)) => {
1219 if update_ctx {
1220 self.ctx.tag_columns.clear();
1221 }
1222 let mut exprs = Vec::with_capacity(labels.labels.len());
1223 for label in &labels.labels {
1224 if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1226 exprs.push(DfExpr::Column(Column::from(field.name())));
1227
1228 if update_ctx {
1229 self.ctx.tag_columns.push(label.clone());
1231 }
1232 }
1233 }
1234 exprs.push(self.create_time_index_column_expr()?);
1236
1237 Ok(exprs)
1238 }
1239 Some(LabelModifier::Exclude(labels)) => {
1240 let mut all_fields = input_schema
1241 .fields()
1242 .iter()
1243 .map(|f| f.name())
1244 .collect::<BTreeSet<_>>();
1245
1246 for label in &labels.labels {
1249 let _ = all_fields.remove(label);
1250 }
1251
1252 if let Some(time_index) = &self.ctx.time_index_column {
1254 let _ = all_fields.remove(time_index);
1255 }
1256 for value in &self.ctx.field_columns {
1257 let _ = all_fields.remove(value);
1258 }
1259
1260 if update_ctx {
1261 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1263 }
1264
1265 let mut exprs = all_fields
1267 .into_iter()
1268 .map(|c| DfExpr::Column(Column::from(c)))
1269 .collect::<Vec<_>>();
1270
1271 exprs.push(self.create_time_index_column_expr()?);
1273
1274 Ok(exprs)
1275 }
1276 }
1277 }
1278
1279 pub fn matchers_to_expr(
1281 label_matchers: Matchers,
1282 table_schema: &DFSchemaRef,
1283 ) -> Result<Vec<DfExpr>> {
1284 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1285 for matcher in label_matchers.matchers {
1286 if matcher.name == SCHEMA_COLUMN_MATCHER
1287 || matcher.name == DB_COLUMN_MATCHER
1288 || matcher.name == FIELD_COLUMN_MATCHER
1289 {
1290 continue;
1291 }
1292
1293 let col = if table_schema
1294 .field_with_unqualified_name(&matcher.name)
1295 .is_err()
1296 {
1297 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None).alias(matcher.name)
1298 } else {
1299 DfExpr::Column(Column::from_name(matcher.name))
1300 };
1301 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1302 let expr = match matcher.op {
1303 MatchOp::Equal => col.eq(lit),
1304 MatchOp::NotEqual => col.not_eq(lit),
1305 MatchOp::Re(re) => {
1306 if re.as_str() == "^(?:.*)$" {
1312 continue;
1313 }
1314 if re.as_str() == "^(?:.+)$" {
1315 col.not_eq(DfExpr::Literal(
1316 ScalarValue::Utf8(Some(String::new())),
1317 None,
1318 ))
1319 } else {
1320 DfExpr::BinaryExpr(BinaryExpr {
1321 left: Box::new(col),
1322 op: Operator::RegexMatch,
1323 right: Box::new(DfExpr::Literal(
1324 ScalarValue::Utf8(Some(re.as_str().to_string())),
1325 None,
1326 )),
1327 })
1328 }
1329 }
1330 MatchOp::NotRe(re) => {
1331 if re.as_str() == "^(?:.*)$" {
1332 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1333 } else if re.as_str() == "^(?:.+)$" {
1334 col.eq(DfExpr::Literal(
1335 ScalarValue::Utf8(Some(String::new())),
1336 None,
1337 ))
1338 } else {
1339 DfExpr::BinaryExpr(BinaryExpr {
1340 left: Box::new(col),
1341 op: Operator::RegexNotMatch,
1342 right: Box::new(DfExpr::Literal(
1343 ScalarValue::Utf8(Some(re.as_str().to_string())),
1344 None,
1345 )),
1346 })
1347 }
1348 }
1349 };
1350 exprs.push(expr);
1351 }
1352
1353 Ok(exprs)
1354 }
1355
1356 fn table_ref(&self) -> Result<TableReference> {
1357 let table_name = self
1358 .ctx
1359 .table_name
1360 .clone()
1361 .context(TableNameNotFoundSnafu)?;
1362
1363 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1365 TableReference::partial(schema_name.as_str(), table_name.as_str())
1366 } else {
1367 TableReference::bare(table_name.as_str())
1368 };
1369
1370 Ok(table_ref)
1371 }
1372
1373 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1374 let start = self.ctx.start;
1375 let end = self.ctx.end;
1376 if end < start {
1377 return InvalidTimeRangeSnafu { start, end }.fail();
1378 }
1379 let lookback_delta = self.ctx.lookback_delta;
1380 let range = self.ctx.range.unwrap_or_default();
1381 let interval = self.ctx.interval;
1382 let time_index_expr = self.create_time_index_column_expr()?;
1383 let num_points = (end - start) / interval;
1384
1385 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1387 let single_time_range = time_index_expr
1388 .clone()
1389 .gt_eq(DfExpr::Literal(
1390 ScalarValue::TimestampMillisecond(
1391 Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1392 None,
1393 ),
1394 None,
1395 ))
1396 .and(time_index_expr.lt_eq(DfExpr::Literal(
1397 ScalarValue::TimestampMillisecond(
1398 Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1399 None,
1400 ),
1401 None,
1402 )));
1403 return Ok(Some(single_time_range));
1404 }
1405
1406 let mut filters = Vec::with_capacity(num_points as usize);
1408 for timestamp in (start..end).step_by(interval as usize) {
1409 filters.push(
1410 time_index_expr
1411 .clone()
1412 .gt_eq(DfExpr::Literal(
1413 ScalarValue::TimestampMillisecond(
1414 Some(timestamp + offset_duration - lookback_delta - range),
1415 None,
1416 ),
1417 None,
1418 ))
1419 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1420 ScalarValue::TimestampMillisecond(
1421 Some(timestamp + offset_duration + lookback_delta),
1422 None,
1423 ),
1424 None,
1425 ))),
1426 )
1427 }
1428
1429 Ok(filters.into_iter().reduce(DfExpr::or))
1430 }
1431
1432 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1437 let provider = self
1438 .table_provider
1439 .resolve_table(table_ref.clone())
1440 .await
1441 .context(CatalogSnafu)?;
1442
1443 let is_time_index_ms = provider
1444 .as_any()
1445 .downcast_ref::<DefaultTableSource>()
1446 .context(UnknownTableSnafu)?
1447 .table_provider
1448 .as_any()
1449 .downcast_ref::<DfTableProviderAdapter>()
1450 .context(UnknownTableSnafu)?
1451 .table()
1452 .schema()
1453 .timestamp_column()
1454 .with_context(|| TimeIndexNotFoundSnafu {
1455 table: table_ref.to_quoted_string(),
1456 })?
1457 .data_type
1458 == ConcreteDataType::timestamp_millisecond_datatype();
1459
1460 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1461 .context(DataFusionPlanningSnafu)?
1462 .build()
1463 .context(DataFusionPlanningSnafu)?;
1464
1465 if !is_time_index_ms {
1466 let expr: Vec<_> = self
1468 .ctx
1469 .field_columns
1470 .iter()
1471 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1472 .chain(self.create_tag_column_exprs()?)
1473 .chain(Some(DfExpr::Alias(Alias {
1474 expr: Box::new(DfExpr::Cast(Cast {
1475 expr: Box::new(self.create_time_index_column_expr()?),
1476 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1477 })),
1478 relation: Some(table_ref.clone()),
1479 name: self
1480 .ctx
1481 .time_index_column
1482 .as_ref()
1483 .with_context(|| TimeIndexNotFoundSnafu {
1484 table: table_ref.to_quoted_string(),
1485 })?
1486 .clone(),
1487 metadata: None,
1488 })))
1489 .collect::<Vec<_>>();
1490 scan_plan = LogicalPlanBuilder::from(scan_plan)
1491 .project(expr)
1492 .context(DataFusionPlanningSnafu)?
1493 .build()
1494 .context(DataFusionPlanningSnafu)?;
1495 }
1496
1497 let result = LogicalPlanBuilder::from(scan_plan)
1498 .build()
1499 .context(DataFusionPlanningSnafu)?;
1500 Ok(result)
1501 }
1502
1503 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1507 let table_ref = self.table_ref()?;
1508 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1509 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1510 let plan = self.setup_context_for_empty_metric()?;
1511 return Ok(Some(plan));
1512 }
1513 res => res.context(CatalogSnafu)?,
1514 };
1515 let table = table
1516 .as_any()
1517 .downcast_ref::<DefaultTableSource>()
1518 .context(UnknownTableSnafu)?
1519 .table_provider
1520 .as_any()
1521 .downcast_ref::<DfTableProviderAdapter>()
1522 .context(UnknownTableSnafu)?
1523 .table();
1524
1525 let time_index = table
1527 .schema()
1528 .timestamp_column()
1529 .with_context(|| TimeIndexNotFoundSnafu {
1530 table: table_ref.to_quoted_string(),
1531 })?
1532 .name
1533 .clone();
1534 self.ctx.time_index_column = Some(time_index);
1535
1536 let values = table
1538 .table_info()
1539 .meta
1540 .field_column_names()
1541 .cloned()
1542 .collect();
1543 self.ctx.field_columns = values;
1544
1545 let tags = table
1547 .table_info()
1548 .meta
1549 .row_key_column_names()
1550 .filter(|col| {
1551 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1553 })
1554 .cloned()
1555 .collect();
1556 self.ctx.tag_columns = tags;
1557
1558 Ok(None)
1559 }
1560
1561 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1564 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1565 self.ctx.reset_table_name_and_schema();
1566 self.ctx.tag_columns = vec![];
1567 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1568
1569 let plan = LogicalPlan::Extension(Extension {
1571 node: Arc::new(
1572 EmptyMetric::new(
1573 0,
1574 -1,
1575 self.ctx.interval,
1576 SPECIAL_TIME_FUNCTION.to_string(),
1577 DEFAULT_FIELD_COLUMN.to_string(),
1578 Some(lit(0.0f64)),
1579 )
1580 .context(DataFusionPlanningSnafu)?,
1581 ),
1582 });
1583 Ok(plan)
1584 }
1585
1586 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1588 let mut result = FunctionArgs::default();
1589
1590 for arg in args {
1591 if let Some(expr) = Self::try_build_literal_expr(arg) {
1593 result.literals.push(expr);
1594 } else {
1595 match arg.as_ref() {
1597 PromExpr::Subquery(_)
1598 | PromExpr::VectorSelector(_)
1599 | PromExpr::MatrixSelector(_)
1600 | PromExpr::Extension(_)
1601 | PromExpr::Aggregate(_)
1602 | PromExpr::Paren(_)
1603 | PromExpr::Call(_)
1604 | PromExpr::Binary(_)
1605 | PromExpr::Unary(_) => {
1606 if result.input.replace(*arg.clone()).is_some() {
1607 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1608 }
1609 }
1610
1611 _ => {
1612 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1613 result.literals.push(expr);
1614 }
1615 }
1616 }
1617 }
1618
1619 Ok(result)
1620 }
1621
1622 fn create_function_expr(
1628 &mut self,
1629 func: &Function,
1630 other_input_exprs: Vec<DfExpr>,
1631 query_engine_state: &QueryEngineState,
1632 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1633 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1635
1636 let field_column_pos = 0;
1638 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1639 let mut new_tags = vec![];
1641 let scalar_func = match func.name {
1642 "increase" => ScalarFunc::ExtrapolateUdf(
1643 Arc::new(Increase::scalar_udf()),
1644 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1645 ),
1646 "rate" => ScalarFunc::ExtrapolateUdf(
1647 Arc::new(Rate::scalar_udf()),
1648 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1649 ),
1650 "delta" => ScalarFunc::ExtrapolateUdf(
1651 Arc::new(Delta::scalar_udf()),
1652 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1653 ),
1654 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1655 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1656 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1657 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1658 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1659 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1660 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1661 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1662 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1663 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1664 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1665 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1666 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1667 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1668 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1669 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1670 "predict_linear" => {
1671 other_input_exprs[0] = DfExpr::Cast(Cast {
1672 expr: Box::new(other_input_exprs[0].clone()),
1673 data_type: ArrowDataType::Int64,
1674 });
1675 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1676 }
1677 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1678 "time" => {
1679 exprs.push(build_special_time_expr(
1680 self.ctx.time_index_column.as_ref().unwrap(),
1681 ));
1682 ScalarFunc::GeneratedExpr
1683 }
1684 "minute" => {
1685 let expr = self.date_part_on_time_index("minute")?;
1687 exprs.push(expr);
1688 ScalarFunc::GeneratedExpr
1689 }
1690 "hour" => {
1691 let expr = self.date_part_on_time_index("hour")?;
1693 exprs.push(expr);
1694 ScalarFunc::GeneratedExpr
1695 }
1696 "month" => {
1697 let expr = self.date_part_on_time_index("month")?;
1699 exprs.push(expr);
1700 ScalarFunc::GeneratedExpr
1701 }
1702 "year" => {
1703 let expr = self.date_part_on_time_index("year")?;
1705 exprs.push(expr);
1706 ScalarFunc::GeneratedExpr
1707 }
1708 "day_of_month" => {
1709 let expr = self.date_part_on_time_index("day")?;
1711 exprs.push(expr);
1712 ScalarFunc::GeneratedExpr
1713 }
1714 "day_of_week" => {
1715 let expr = self.date_part_on_time_index("dow")?;
1717 exprs.push(expr);
1718 ScalarFunc::GeneratedExpr
1719 }
1720 "day_of_year" => {
1721 let expr = self.date_part_on_time_index("doy")?;
1723 exprs.push(expr);
1724 ScalarFunc::GeneratedExpr
1725 }
1726 "days_in_month" => {
1727 let day_lit_expr = "day".lit();
1732 let month_lit_expr = "month".lit();
1733 let interval_1month_lit_expr =
1734 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1735 let interval_1day_lit_expr = DfExpr::Literal(
1736 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1737 None,
1738 );
1739 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1740 left: Box::new(interval_1month_lit_expr),
1741 op: Operator::Minus,
1742 right: Box::new(interval_1day_lit_expr),
1743 });
1744 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1745 func: datafusion_functions::datetime::date_trunc(),
1746 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1747 });
1748 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1749 left: Box::new(date_trunc_expr),
1750 op: Operator::Plus,
1751 right: Box::new(the_1month_minus_1day_expr),
1752 });
1753 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1754 func: datafusion_functions::datetime::date_part(),
1755 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1756 });
1757
1758 exprs.push(date_part_expr);
1759 ScalarFunc::GeneratedExpr
1760 }
1761
1762 "label_join" => {
1763 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1764 &mut other_input_exprs,
1765 &self.ctx,
1766 query_engine_state,
1767 )?;
1768
1769 for value in &self.ctx.field_columns {
1771 if *value != dst_label {
1772 let expr = DfExpr::Column(Column::from_name(value));
1773 exprs.push(expr);
1774 }
1775 }
1776
1777 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1779 new_tags.push(dst_label);
1780 exprs.push(concat_expr);
1782
1783 ScalarFunc::GeneratedExpr
1784 }
1785 "label_replace" => {
1786 if let Some((replace_expr, dst_label)) = self
1787 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1788 {
1789 for value in &self.ctx.field_columns {
1791 if *value != dst_label {
1792 let expr = DfExpr::Column(Column::from_name(value));
1793 exprs.push(expr);
1794 }
1795 }
1796
1797 ensure!(
1798 !self.ctx.tag_columns.contains(&dst_label),
1799 SameLabelSetSnafu
1800 );
1801 new_tags.push(dst_label);
1802 exprs.push(replace_expr);
1804 } else {
1805 for value in &self.ctx.field_columns {
1807 let expr = DfExpr::Column(Column::from_name(value));
1808 exprs.push(expr);
1809 }
1810 }
1811
1812 ScalarFunc::GeneratedExpr
1813 }
1814 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1815 for value in &self.ctx.field_columns {
1818 let expr = DfExpr::Column(Column::from_name(value));
1819 exprs.push(expr);
1820 }
1821
1822 ScalarFunc::GeneratedExpr
1823 }
1824 "round" => {
1825 if other_input_exprs.is_empty() {
1826 other_input_exprs.push_front(0.0f64.lit());
1827 }
1828 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1829 }
1830 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1831 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1832 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1833 "pi" => {
1834 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1836 func: datafusion::functions::math::pi(),
1837 args: vec![],
1838 });
1839 exprs.push(fn_expr);
1840
1841 ScalarFunc::GeneratedExpr
1842 }
1843 _ => {
1844 if let Some(f) = query_engine_state
1845 .session_state()
1846 .scalar_functions()
1847 .get(func.name)
1848 {
1849 ScalarFunc::DataFusionBuiltin(f.clone())
1850 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1851 let func_state = query_engine_state.function_state();
1852 let query_ctx = self.table_provider.query_ctx();
1853
1854 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1855 state: func_state,
1856 query_ctx: query_ctx.clone(),
1857 })))
1858 } else if let Some(f) = datafusion_functions::math::functions()
1859 .iter()
1860 .find(|f| f.name() == func.name)
1861 {
1862 ScalarFunc::DataFusionUdf(f.clone())
1863 } else {
1864 return UnsupportedExprSnafu {
1865 name: func.name.to_string(),
1866 }
1867 .fail();
1868 }
1869 }
1870 };
1871
1872 for value in &self.ctx.field_columns {
1873 let col_expr = DfExpr::Column(Column::from_name(value));
1874
1875 match scalar_func.clone() {
1876 ScalarFunc::DataFusionBuiltin(func) => {
1877 other_input_exprs.insert(field_column_pos, col_expr);
1878 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1879 func,
1880 args: other_input_exprs.clone().into(),
1881 });
1882 exprs.push(fn_expr);
1883 let _ = other_input_exprs.remove(field_column_pos);
1884 }
1885 ScalarFunc::DataFusionUdf(func) => {
1886 let args = itertools::chain!(
1887 other_input_exprs.iter().take(field_column_pos).cloned(),
1888 std::iter::once(col_expr),
1889 other_input_exprs.iter().skip(field_column_pos).cloned()
1890 )
1891 .collect_vec();
1892 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1893 }
1894 ScalarFunc::Udf(func) => {
1895 let ts_range_expr = DfExpr::Column(Column::from_name(
1896 RangeManipulate::build_timestamp_range_name(
1897 self.ctx.time_index_column.as_ref().unwrap(),
1898 ),
1899 ));
1900 other_input_exprs.insert(field_column_pos, ts_range_expr);
1901 other_input_exprs.insert(field_column_pos + 1, col_expr);
1902 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1903 func,
1904 args: other_input_exprs.clone().into(),
1905 });
1906 exprs.push(fn_expr);
1907 let _ = other_input_exprs.remove(field_column_pos + 1);
1908 let _ = other_input_exprs.remove(field_column_pos);
1909 }
1910 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1911 let ts_range_expr = DfExpr::Column(Column::from_name(
1912 RangeManipulate::build_timestamp_range_name(
1913 self.ctx.time_index_column.as_ref().unwrap(),
1914 ),
1915 ));
1916 other_input_exprs.insert(field_column_pos, ts_range_expr);
1917 other_input_exprs.insert(field_column_pos + 1, col_expr);
1918 other_input_exprs
1919 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1920 other_input_exprs.push_back(lit(range_length));
1921 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1922 func,
1923 args: other_input_exprs.clone().into(),
1924 });
1925 exprs.push(fn_expr);
1926 let _ = other_input_exprs.pop_back();
1927 let _ = other_input_exprs.remove(field_column_pos + 2);
1928 let _ = other_input_exprs.remove(field_column_pos + 1);
1929 let _ = other_input_exprs.remove(field_column_pos);
1930 }
1931 ScalarFunc::GeneratedExpr => {}
1932 }
1933 }
1934
1935 if !matches!(func.name, "label_join" | "label_replace") {
1939 let mut new_field_columns = Vec::with_capacity(exprs.len());
1940
1941 exprs = exprs
1942 .into_iter()
1943 .map(|expr| {
1944 let display_name = expr.schema_name().to_string();
1945 new_field_columns.push(display_name.clone());
1946 Ok(expr.alias(display_name))
1947 })
1948 .collect::<std::result::Result<Vec<_>, _>>()
1949 .context(DataFusionPlanningSnafu)?;
1950
1951 self.ctx.field_columns = new_field_columns;
1952 }
1953
1954 Ok((exprs, new_tags))
1955 }
1956
1957 fn validate_label_name(label_name: &str) -> Result<()> {
1961 if label_name.starts_with("__") {
1963 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1964 }
1965 if !LABEL_NAME_REGEX.is_match(label_name) {
1967 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1968 }
1969
1970 Ok(())
1971 }
1972
1973 fn build_regexp_replace_label_expr(
1975 &self,
1976 other_input_exprs: &mut VecDeque<DfExpr>,
1977 query_engine_state: &QueryEngineState,
1978 ) -> Result<Option<(DfExpr, String)>> {
1979 let dst_label = match other_input_exprs.pop_front() {
1981 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1982 other => UnexpectedPlanExprSnafu {
1983 desc: format!("expected dst_label string literal, but found {:?}", other),
1984 }
1985 .fail()?,
1986 };
1987
1988 Self::validate_label_name(&dst_label)?;
1990 let replacement = match other_input_exprs.pop_front() {
1991 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
1992 other => UnexpectedPlanExprSnafu {
1993 desc: format!("expected replacement string literal, but found {:?}", other),
1994 }
1995 .fail()?,
1996 };
1997 let src_label = match other_input_exprs.pop_front() {
1998 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
1999 other => UnexpectedPlanExprSnafu {
2000 desc: format!("expected src_label string literal, but found {:?}", other),
2001 }
2002 .fail()?,
2003 };
2004
2005 let regex = match other_input_exprs.pop_front() {
2006 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2007 other => UnexpectedPlanExprSnafu {
2008 desc: format!("expected regex string literal, but found {:?}", other),
2009 }
2010 .fail()?,
2011 };
2012
2013 regex::Regex::new(®ex).map_err(|_| {
2016 InvalidRegularExpressionSnafu {
2017 regex: regex.clone(),
2018 }
2019 .build()
2020 })?;
2021
2022 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2024 return Ok(None);
2025 }
2026
2027 if !self.ctx.tag_columns.contains(&src_label) {
2029 if replacement.is_empty() {
2030 return Ok(None);
2032 } else {
2033 return Ok(Some((
2035 lit(replacement).alias(&dst_label),
2037 dst_label,
2038 )));
2039 }
2040 }
2041
2042 let regex = format!("^(?s:{regex})$");
2045
2046 let session_state = query_engine_state.session_state();
2047 let func = session_state
2048 .scalar_functions()
2049 .get("regexp_replace")
2050 .context(UnsupportedExprSnafu {
2051 name: "regexp_replace",
2052 })?;
2053
2054 let args = vec![
2056 if src_label.is_empty() {
2057 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2058 } else {
2059 DfExpr::Column(Column::from_name(src_label))
2060 },
2061 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2062 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2063 ];
2064
2065 Ok(Some((
2066 DfExpr::ScalarFunction(ScalarFunction {
2067 func: func.clone(),
2068 args,
2069 })
2070 .alias(&dst_label),
2071 dst_label,
2072 )))
2073 }
2074
2075 fn build_concat_labels_expr(
2077 other_input_exprs: &mut VecDeque<DfExpr>,
2078 ctx: &PromPlannerContext,
2079 query_engine_state: &QueryEngineState,
2080 ) -> Result<(DfExpr, String)> {
2081 let dst_label = match other_input_exprs.pop_front() {
2084 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2085 other => UnexpectedPlanExprSnafu {
2086 desc: format!("expected dst_label string literal, but found {:?}", other),
2087 }
2088 .fail()?,
2089 };
2090 let separator = match other_input_exprs.pop_front() {
2091 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2092 other => UnexpectedPlanExprSnafu {
2093 desc: format!("expected separator string literal, but found {:?}", other),
2094 }
2095 .fail()?,
2096 };
2097
2098 let available_columns: HashSet<&str> = ctx
2100 .tag_columns
2101 .iter()
2102 .chain(ctx.field_columns.iter())
2103 .chain(ctx.time_index_column.as_ref())
2104 .map(|s| s.as_str())
2105 .collect();
2106
2107 let src_labels = other_input_exprs
2108 .iter()
2109 .map(|expr| {
2110 match expr {
2112 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2113 if label.is_empty() {
2114 Ok(DfExpr::Literal(ScalarValue::Null, None))
2115 } else if available_columns.contains(label.as_str()) {
2116 Ok(DfExpr::Column(Column::from_name(label)))
2118 } else {
2119 Ok(DfExpr::Literal(ScalarValue::Null, None))
2121 }
2122 }
2123 other => UnexpectedPlanExprSnafu {
2124 desc: format!(
2125 "expected source label string literal, but found {:?}",
2126 other
2127 ),
2128 }
2129 .fail(),
2130 }
2131 })
2132 .collect::<Result<Vec<_>>>()?;
2133 ensure!(
2134 !src_labels.is_empty(),
2135 FunctionInvalidArgumentSnafu {
2136 fn_name: "label_join"
2137 }
2138 );
2139
2140 let session_state = query_engine_state.session_state();
2141 let func = session_state
2142 .scalar_functions()
2143 .get("concat_ws")
2144 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2145
2146 let mut args = Vec::with_capacity(1 + src_labels.len());
2148 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2149 args.extend(src_labels);
2150
2151 Ok((
2152 DfExpr::ScalarFunction(ScalarFunction {
2153 func: func.clone(),
2154 args,
2155 })
2156 .alias(&dst_label),
2157 dst_label,
2158 ))
2159 }
2160
2161 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2162 Ok(DfExpr::Column(Column::from_name(
2163 self.ctx
2164 .time_index_column
2165 .clone()
2166 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2167 )))
2168 }
2169
2170 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2171 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2172 for tag in &self.ctx.tag_columns {
2173 let expr = DfExpr::Column(Column::from_name(tag));
2174 result.push(expr);
2175 }
2176 Ok(result)
2177 }
2178
2179 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2180 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2181 for field in &self.ctx.field_columns {
2182 let expr = DfExpr::Column(Column::from_name(field));
2183 result.push(expr);
2184 }
2185 Ok(result)
2186 }
2187
2188 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2189 let mut result = self
2190 .ctx
2191 .tag_columns
2192 .iter()
2193 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2194 .collect::<Vec<_>>();
2195 result.push(self.create_time_index_column_expr()?.sort(true, true));
2196 Ok(result)
2197 }
2198
2199 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2200 self.ctx
2201 .field_columns
2202 .iter()
2203 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2204 .collect::<Vec<_>>()
2205 }
2206
2207 fn create_sort_exprs_by_tags(
2208 func: &str,
2209 tags: Vec<DfExpr>,
2210 asc: bool,
2211 ) -> Result<Vec<SortExpr>> {
2212 ensure!(
2213 !tags.is_empty(),
2214 FunctionInvalidArgumentSnafu { fn_name: func }
2215 );
2216
2217 tags.iter()
2218 .map(|col| match col {
2219 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2220 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2221 }
2222 other => UnexpectedPlanExprSnafu {
2223 desc: format!("expected label string literal, but found {:?}", other),
2224 }
2225 .fail(),
2226 })
2227 .collect::<Result<Vec<_>>>()
2228 }
2229
2230 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2231 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2232 for value in &self.ctx.field_columns {
2233 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2234 exprs.push(expr);
2235 }
2236
2237 conjunction(exprs).context(ValueNotFoundSnafu {
2238 table: self.table_ref()?.to_quoted_string(),
2239 })
2240 }
2241
2242 fn create_aggregate_exprs(
2258 &mut self,
2259 op: TokenType,
2260 param: &Option<Box<PromExpr>>,
2261 input_plan: &LogicalPlan,
2262 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2263 let mut non_col_args = Vec::new();
2264 let aggr = match op.id() {
2265 token::T_SUM => sum_udaf(),
2266 token::T_QUANTILE => {
2267 let q =
2268 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2269 non_col_args.push(q);
2270 quantile_udaf()
2271 }
2272 token::T_AVG => avg_udaf(),
2273 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2274 token::T_MIN => min_udaf(),
2275 token::T_MAX => max_udaf(),
2276 token::T_GROUP => grouping_udaf(),
2277 token::T_STDDEV => stddev_pop_udaf(),
2278 token::T_STDVAR => var_pop_udaf(),
2279 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2280 name: format!("{op:?}"),
2281 }
2282 .fail()?,
2283 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2284 };
2285
2286 let exprs: Vec<DfExpr> = self
2288 .ctx
2289 .field_columns
2290 .iter()
2291 .map(|col| {
2292 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2293 let expr = aggr.call(non_col_args.clone());
2294 non_col_args.pop();
2295 expr
2296 })
2297 .collect::<Vec<_>>();
2298
2299 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2301 let prev_field_exprs: Vec<_> = self
2302 .ctx
2303 .field_columns
2304 .iter()
2305 .map(|col| DfExpr::Column(Column::from_name(col)))
2306 .collect();
2307
2308 ensure!(
2309 self.ctx.field_columns.len() == 1,
2310 UnsupportedExprSnafu {
2311 name: "count_values on multi-value input"
2312 }
2313 );
2314
2315 prev_field_exprs
2316 } else {
2317 vec![]
2318 };
2319
2320 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2322
2323 let normalized_exprs =
2324 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2325 for expr in normalized_exprs {
2326 new_field_columns.push(expr.schema_name().to_string());
2327 }
2328 self.ctx.field_columns = new_field_columns;
2329
2330 Ok((exprs, prev_field_exprs))
2331 }
2332
2333 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2334 let param = param
2335 .as_deref()
2336 .with_context(|| FunctionInvalidArgumentSnafu {
2337 fn_name: op.to_string(),
2338 })?;
2339 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2340 return FunctionInvalidArgumentSnafu {
2341 fn_name: op.to_string(),
2342 }
2343 .fail();
2344 };
2345
2346 Ok(val)
2347 }
2348
2349 fn get_param_as_literal_expr(
2350 param: &Option<Box<PromExpr>>,
2351 op: Option<TokenType>,
2352 expected_type: Option<ArrowDataType>,
2353 ) -> Result<DfExpr> {
2354 let prom_param = param.as_deref().with_context(|| {
2355 if let Some(op) = op {
2356 FunctionInvalidArgumentSnafu {
2357 fn_name: op.to_string(),
2358 }
2359 } else {
2360 FunctionInvalidArgumentSnafu {
2361 fn_name: "unknown".to_string(),
2362 }
2363 }
2364 })?;
2365
2366 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2367 if let Some(op) = op {
2368 FunctionInvalidArgumentSnafu {
2369 fn_name: op.to_string(),
2370 }
2371 } else {
2372 FunctionInvalidArgumentSnafu {
2373 fn_name: "unknown".to_string(),
2374 }
2375 }
2376 })?;
2377
2378 if let Some(expected_type) = expected_type {
2380 let expr_type = expr
2382 .get_type(&DFSchema::empty())
2383 .context(DataFusionPlanningSnafu)?;
2384 if expected_type != expr_type {
2385 return FunctionInvalidArgumentSnafu {
2386 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2387 }
2388 .fail();
2389 }
2390 }
2391
2392 Ok(expr)
2393 }
2394
2395 fn create_window_exprs(
2398 &mut self,
2399 op: TokenType,
2400 group_exprs: Vec<DfExpr>,
2401 input_plan: &LogicalPlan,
2402 ) -> Result<Vec<DfExpr>> {
2403 ensure!(
2404 self.ctx.field_columns.len() == 1,
2405 UnsupportedExprSnafu {
2406 name: "topk or bottomk on multi-value input"
2407 }
2408 );
2409
2410 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2411
2412 let asc = matches!(op.id(), token::T_BOTTOMK);
2413
2414 let tag_sort_exprs = self
2415 .create_tag_column_exprs()?
2416 .into_iter()
2417 .map(|expr| expr.sort(asc, true));
2418
2419 let exprs: Vec<DfExpr> = self
2421 .ctx
2422 .field_columns
2423 .iter()
2424 .map(|col| {
2425 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2426 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2428 sort_exprs.extend(tag_sort_exprs.clone());
2431
2432 DfExpr::WindowFunction(Box::new(WindowFunction {
2433 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2434 params: WindowFunctionParams {
2435 args: vec![],
2436 partition_by: group_exprs.clone(),
2437 order_by: sort_exprs,
2438 window_frame: WindowFrame::new(Some(true)),
2439 null_treatment: None,
2440 distinct: false,
2441 },
2442 }))
2443 })
2444 .collect();
2445
2446 let normalized_exprs =
2447 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2448 Ok(normalized_exprs)
2449 }
2450
2451 #[deprecated(
2453 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2454 )]
2455 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2456 match expr {
2457 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2458 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2459 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2460 Self::try_build_float_literal(expr).map(|f| -f)
2461 }
2462 PromExpr::StringLiteral(_)
2463 | PromExpr::Binary(_)
2464 | PromExpr::VectorSelector(_)
2465 | PromExpr::MatrixSelector(_)
2466 | PromExpr::Call(_)
2467 | PromExpr::Extension(_)
2468 | PromExpr::Aggregate(_)
2469 | PromExpr::Subquery(_) => None,
2470 }
2471 }
2472
2473 async fn create_histogram_plan(
2475 &mut self,
2476 args: &PromFunctionArgs,
2477 query_engine_state: &QueryEngineState,
2478 ) -> Result<LogicalPlan> {
2479 if args.args.len() != 2 {
2480 return FunctionInvalidArgumentSnafu {
2481 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2482 }
2483 .fail();
2484 }
2485 #[allow(deprecated)]
2486 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2487 FunctionInvalidArgumentSnafu {
2488 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2489 }
2490 })?;
2491
2492 let input = args.args[1].as_ref().clone();
2493 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2494
2495 if !self.ctx.has_le_tag() {
2496 return Ok(LogicalPlan::EmptyRelation(
2499 datafusion::logical_expr::EmptyRelation {
2500 produce_one_row: false,
2501 schema: Arc::new(DFSchema::empty()),
2502 },
2503 ));
2504 }
2505 let time_index_column =
2506 self.ctx
2507 .time_index_column
2508 .clone()
2509 .with_context(|| TimeIndexNotFoundSnafu {
2510 table: self.ctx.table_name.clone().unwrap_or_default(),
2511 })?;
2512 let field_column = self
2514 .ctx
2515 .field_columns
2516 .first()
2517 .with_context(|| FunctionInvalidArgumentSnafu {
2518 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2519 })?
2520 .clone();
2521 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2523
2524 Ok(LogicalPlan::Extension(Extension {
2525 node: Arc::new(
2526 HistogramFold::new(
2527 LE_COLUMN_NAME.to_string(),
2528 field_column,
2529 time_index_column,
2530 phi,
2531 input_plan,
2532 )
2533 .context(DataFusionPlanningSnafu)?,
2534 ),
2535 }))
2536 }
2537
2538 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2540 if args.args.len() != 1 {
2541 return FunctionInvalidArgumentSnafu {
2542 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2543 }
2544 .fail();
2545 }
2546 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2547
2548 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2550 self.ctx.reset_table_name_and_schema();
2551 self.ctx.tag_columns = vec![];
2552 self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2553 Ok(LogicalPlan::Extension(Extension {
2554 node: Arc::new(
2555 EmptyMetric::new(
2556 self.ctx.start,
2557 self.ctx.end,
2558 self.ctx.interval,
2559 SPECIAL_TIME_FUNCTION.to_string(),
2560 GREPTIME_VALUE.to_string(),
2561 Some(lit),
2562 )
2563 .context(DataFusionPlanningSnafu)?,
2564 ),
2565 }))
2566 }
2567
2568 async fn create_scalar_plan(
2570 &mut self,
2571 args: &PromFunctionArgs,
2572 query_engine_state: &QueryEngineState,
2573 ) -> Result<LogicalPlan> {
2574 ensure!(
2575 args.len() == 1,
2576 FunctionInvalidArgumentSnafu {
2577 fn_name: SCALAR_FUNCTION
2578 }
2579 );
2580 let input = self
2581 .prom_expr_to_plan(&args.args[0], query_engine_state)
2582 .await?;
2583 ensure!(
2584 self.ctx.field_columns.len() == 1,
2585 MultiFieldsNotSupportedSnafu {
2586 operator: SCALAR_FUNCTION
2587 },
2588 );
2589 let scalar_plan = LogicalPlan::Extension(Extension {
2590 node: Arc::new(
2591 ScalarCalculate::new(
2592 self.ctx.start,
2593 self.ctx.end,
2594 self.ctx.interval,
2595 input,
2596 self.ctx.time_index_column.as_ref().unwrap(),
2597 &self.ctx.tag_columns,
2598 &self.ctx.field_columns[0],
2599 self.ctx.table_name.as_deref(),
2600 )
2601 .context(PromqlPlanNodeSnafu)?,
2602 ),
2603 });
2604 self.ctx.tag_columns.clear();
2606 self.ctx.field_columns.clear();
2607 self.ctx
2608 .field_columns
2609 .push(scalar_plan.schema().field(1).name().clone());
2610 Ok(scalar_plan)
2611 }
2612
2613 async fn create_absent_plan(
2615 &mut self,
2616 args: &PromFunctionArgs,
2617 query_engine_state: &QueryEngineState,
2618 ) -> Result<LogicalPlan> {
2619 if args.args.len() != 1 {
2620 return FunctionInvalidArgumentSnafu {
2621 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2622 }
2623 .fail();
2624 }
2625 let input = self
2626 .prom_expr_to_plan(&args.args[0], query_engine_state)
2627 .await?;
2628
2629 let time_index_expr = self.create_time_index_column_expr()?;
2630 let first_field_expr =
2631 self.create_field_column_exprs()?
2632 .pop()
2633 .with_context(|| ValueNotFoundSnafu {
2634 table: self.ctx.table_name.clone().unwrap_or_default(),
2635 })?;
2636 let first_value_expr = first_value(first_field_expr, vec![]);
2637
2638 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2639 .aggregate(
2640 vec![time_index_expr.clone()],
2641 vec![first_value_expr.clone()],
2642 )
2643 .context(DataFusionPlanningSnafu)?
2644 .sort(vec![time_index_expr.sort(true, false)])
2645 .context(DataFusionPlanningSnafu)?
2646 .build()
2647 .context(DataFusionPlanningSnafu)?;
2648
2649 let fake_labels = self
2650 .ctx
2651 .selector_matcher
2652 .iter()
2653 .filter_map(|matcher| match matcher.op {
2654 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2655 _ => None,
2656 })
2657 .collect::<Vec<_>>();
2658
2659 let absent_plan = LogicalPlan::Extension(Extension {
2661 node: Arc::new(
2662 Absent::try_new(
2663 self.ctx.start,
2664 self.ctx.end,
2665 self.ctx.interval,
2666 self.ctx.time_index_column.as_ref().unwrap().clone(),
2667 self.ctx.field_columns[0].clone(),
2668 fake_labels,
2669 ordered_aggregated_input,
2670 )
2671 .context(DataFusionPlanningSnafu)?,
2672 ),
2673 });
2674
2675 Ok(absent_plan)
2676 }
2677
2678 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2681 match expr {
2682 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2683 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2684 PromExpr::VectorSelector(_)
2685 | PromExpr::MatrixSelector(_)
2686 | PromExpr::Extension(_)
2687 | PromExpr::Aggregate(_)
2688 | PromExpr::Subquery(_) => None,
2689 PromExpr::Call(Call { func, .. }) => {
2690 if func.name == SPECIAL_TIME_FUNCTION {
2691 None
2694 } else {
2695 None
2696 }
2697 }
2698 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2699 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2701 PromExpr::Binary(PromBinaryExpr {
2702 lhs,
2703 rhs,
2704 op,
2705 modifier,
2706 }) => {
2707 let lhs = Self::try_build_literal_expr(lhs)?;
2708 let rhs = Self::try_build_literal_expr(rhs)?;
2709 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2710 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2711 let expr = expr_builder(lhs, rhs).ok()?;
2712
2713 let should_return_bool = if let Some(m) = modifier {
2714 m.return_bool
2715 } else {
2716 false
2717 };
2718 if is_comparison_op && should_return_bool {
2719 Some(DfExpr::Cast(Cast {
2720 expr: Box::new(expr),
2721 data_type: ArrowDataType::Float64,
2722 }))
2723 } else {
2724 Some(expr)
2725 }
2726 }
2727 }
2728 }
2729
2730 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2731 match expr {
2732 PromExpr::Call(Call { func, .. }) => {
2733 if func.name == SPECIAL_TIME_FUNCTION
2734 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2735 {
2736 Some(build_special_time_expr(time_index_col))
2737 } else {
2738 None
2739 }
2740 }
2741 _ => None,
2742 }
2743 }
2744
2745 #[allow(clippy::type_complexity)]
2748 fn prom_token_to_binary_expr_builder(
2749 token: TokenType,
2750 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2751 match token.id() {
2752 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2753 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2754 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2755 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2756 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2757 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2758 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2759 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2760 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2761 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2762 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2763 token::T_POW => Ok(Box::new(|lhs, rhs| {
2764 Ok(DfExpr::ScalarFunction(ScalarFunction {
2765 func: datafusion_functions::math::power(),
2766 args: vec![lhs, rhs],
2767 }))
2768 })),
2769 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2770 Ok(DfExpr::ScalarFunction(ScalarFunction {
2771 func: datafusion_functions::math::atan2(),
2772 args: vec![lhs, rhs],
2773 }))
2774 })),
2775 _ => UnexpectedTokenSnafu { token }.fail(),
2776 }
2777 }
2778
2779 fn is_token_a_comparison_op(token: TokenType) -> bool {
2781 matches!(
2782 token.id(),
2783 token::T_EQLC
2784 | token::T_NEQ
2785 | token::T_GTR
2786 | token::T_LSS
2787 | token::T_GTE
2788 | token::T_LTE
2789 )
2790 }
2791
2792 fn is_token_a_set_op(token: TokenType) -> bool {
2794 matches!(
2795 token.id(),
2796 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2800 }
2801
2802 #[allow(clippy::too_many_arguments)]
2805 fn join_on_non_field_columns(
2806 &self,
2807 left: LogicalPlan,
2808 right: LogicalPlan,
2809 left_table_ref: TableReference,
2810 right_table_ref: TableReference,
2811 left_time_index_column: Option<String>,
2812 right_time_index_column: Option<String>,
2813 only_join_time_index: bool,
2814 modifier: &Option<BinModifier>,
2815 ) -> Result<LogicalPlan> {
2816 let mut left_tag_columns = if only_join_time_index {
2817 BTreeSet::new()
2818 } else {
2819 self.ctx
2820 .tag_columns
2821 .iter()
2822 .cloned()
2823 .collect::<BTreeSet<_>>()
2824 };
2825 let mut right_tag_columns = left_tag_columns.clone();
2826
2827 if let Some(modifier) = modifier {
2829 if let Some(matching) = &modifier.matching {
2831 match matching {
2832 LabelModifier::Include(on) => {
2834 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2835 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2836 right_tag_columns =
2837 right_tag_columns.intersection(&mask).cloned().collect();
2838 }
2839 LabelModifier::Exclude(ignoring) => {
2841 for label in &ignoring.labels {
2843 let _ = left_tag_columns.remove(label);
2844 let _ = right_tag_columns.remove(label);
2845 }
2846 }
2847 }
2848 }
2849 }
2850
2851 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2853 (left_time_index_column, right_time_index_column)
2854 {
2855 left_tag_columns.insert(left_time_index_column);
2856 right_tag_columns.insert(right_time_index_column);
2857 }
2858
2859 let right = LogicalPlanBuilder::from(right)
2860 .alias(right_table_ref)
2861 .context(DataFusionPlanningSnafu)?
2862 .build()
2863 .context(DataFusionPlanningSnafu)?;
2864
2865 LogicalPlanBuilder::from(left)
2867 .alias(left_table_ref)
2868 .context(DataFusionPlanningSnafu)?
2869 .join_detailed(
2870 right,
2871 JoinType::Inner,
2872 (
2873 left_tag_columns
2874 .into_iter()
2875 .map(Column::from_name)
2876 .collect::<Vec<_>>(),
2877 right_tag_columns
2878 .into_iter()
2879 .map(Column::from_name)
2880 .collect::<Vec<_>>(),
2881 ),
2882 None,
2883 NullEquality::NullEqualsNull,
2884 )
2885 .context(DataFusionPlanningSnafu)?
2886 .build()
2887 .context(DataFusionPlanningSnafu)
2888 }
2889
2890 fn set_op_on_non_field_columns(
2892 &mut self,
2893 left: LogicalPlan,
2894 mut right: LogicalPlan,
2895 left_context: PromPlannerContext,
2896 right_context: PromPlannerContext,
2897 op: TokenType,
2898 modifier: &Option<BinModifier>,
2899 ) -> Result<LogicalPlan> {
2900 let mut left_tag_col_set = left_context
2901 .tag_columns
2902 .iter()
2903 .cloned()
2904 .collect::<HashSet<_>>();
2905 let mut right_tag_col_set = right_context
2906 .tag_columns
2907 .iter()
2908 .cloned()
2909 .collect::<HashSet<_>>();
2910
2911 if matches!(op.id(), token::T_LOR) {
2912 return self.or_operator(
2913 left,
2914 right,
2915 left_tag_col_set,
2916 right_tag_col_set,
2917 left_context,
2918 right_context,
2919 modifier,
2920 );
2921 }
2922
2923 if let Some(modifier) = modifier {
2925 ensure!(
2927 matches!(
2928 modifier.card,
2929 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2930 ),
2931 UnsupportedVectorMatchSnafu {
2932 name: modifier.card.clone(),
2933 },
2934 );
2935 if let Some(matching) = &modifier.matching {
2937 match matching {
2938 LabelModifier::Include(on) => {
2940 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2941 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2942 right_tag_col_set =
2943 right_tag_col_set.intersection(&mask).cloned().collect();
2944 }
2945 LabelModifier::Exclude(ignoring) => {
2947 for label in &ignoring.labels {
2949 let _ = left_tag_col_set.remove(label);
2950 let _ = right_tag_col_set.remove(label);
2951 }
2952 }
2953 }
2954 }
2955 }
2956 if !matches!(op.id(), token::T_LOR) {
2958 ensure!(
2959 left_tag_col_set == right_tag_col_set,
2960 CombineTableColumnMismatchSnafu {
2961 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2962 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2963 }
2964 )
2965 };
2966 let left_time_index = left_context.time_index_column.clone().unwrap();
2967 let right_time_index = right_context.time_index_column.clone().unwrap();
2968 let join_keys = left_tag_col_set
2969 .iter()
2970 .cloned()
2971 .chain([left_time_index.clone()])
2972 .collect::<Vec<_>>();
2973 self.ctx.time_index_column = Some(left_time_index.clone());
2974
2975 if left_context.time_index_column != right_context.time_index_column {
2977 let right_project_exprs = right
2978 .schema()
2979 .fields()
2980 .iter()
2981 .map(|field| {
2982 if field.name() == &right_time_index {
2983 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2984 } else {
2985 DfExpr::Column(Column::from_name(field.name()))
2986 }
2987 })
2988 .collect::<Vec<_>>();
2989
2990 right = LogicalPlanBuilder::from(right)
2991 .project(right_project_exprs)
2992 .context(DataFusionPlanningSnafu)?
2993 .build()
2994 .context(DataFusionPlanningSnafu)?;
2995 }
2996
2997 ensure!(
2998 left_context.field_columns.len() == 1,
2999 MultiFieldsNotSupportedSnafu {
3000 operator: "AND operator"
3001 }
3002 );
3003 let left_field_col = left_context.field_columns.first().unwrap();
3006 self.ctx.field_columns = vec![left_field_col.clone()];
3007
3008 match op.id() {
3011 token::T_LAND => LogicalPlanBuilder::from(left)
3012 .distinct()
3013 .context(DataFusionPlanningSnafu)?
3014 .join_detailed(
3015 right,
3016 JoinType::LeftSemi,
3017 (join_keys.clone(), join_keys),
3018 None,
3019 NullEquality::NullEqualsNull,
3020 )
3021 .context(DataFusionPlanningSnafu)?
3022 .build()
3023 .context(DataFusionPlanningSnafu),
3024 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3025 .distinct()
3026 .context(DataFusionPlanningSnafu)?
3027 .join_detailed(
3028 right,
3029 JoinType::LeftAnti,
3030 (join_keys.clone(), join_keys),
3031 None,
3032 NullEquality::NullEqualsNull,
3033 )
3034 .context(DataFusionPlanningSnafu)?
3035 .build()
3036 .context(DataFusionPlanningSnafu),
3037 token::T_LOR => {
3038 unreachable!()
3041 }
3042 _ => UnexpectedTokenSnafu { token: op }.fail(),
3043 }
3044 }
3045
3046 #[allow(clippy::too_many_arguments)]
3048 fn or_operator(
3049 &mut self,
3050 left: LogicalPlan,
3051 right: LogicalPlan,
3052 left_tag_cols_set: HashSet<String>,
3053 right_tag_cols_set: HashSet<String>,
3054 left_context: PromPlannerContext,
3055 right_context: PromPlannerContext,
3056 modifier: &Option<BinModifier>,
3057 ) -> Result<LogicalPlan> {
3058 ensure!(
3060 left_context.field_columns.len() == right_context.field_columns.len(),
3061 CombineTableColumnMismatchSnafu {
3062 left: left_context.field_columns.clone(),
3063 right: right_context.field_columns.clone()
3064 }
3065 );
3066 ensure!(
3067 left_context.field_columns.len() == 1,
3068 MultiFieldsNotSupportedSnafu {
3069 operator: "OR operator"
3070 }
3071 );
3072
3073 let all_tags = left_tag_cols_set
3075 .union(&right_tag_cols_set)
3076 .cloned()
3077 .collect::<HashSet<_>>();
3078 let tags_not_in_left = all_tags
3079 .difference(&left_tag_cols_set)
3080 .cloned()
3081 .collect::<Vec<_>>();
3082 let tags_not_in_right = all_tags
3083 .difference(&right_tag_cols_set)
3084 .cloned()
3085 .collect::<Vec<_>>();
3086 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3087 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3088 let left_qualifier_string = left_qualifier
3089 .as_ref()
3090 .map(|l| l.to_string())
3091 .unwrap_or_default();
3092 let right_qualifier_string = right_qualifier
3093 .as_ref()
3094 .map(|r| r.to_string())
3095 .unwrap_or_default();
3096 let left_time_index_column =
3097 left_context
3098 .time_index_column
3099 .clone()
3100 .with_context(|| TimeIndexNotFoundSnafu {
3101 table: left_qualifier_string.clone(),
3102 })?;
3103 let right_time_index_column =
3104 right_context
3105 .time_index_column
3106 .clone()
3107 .with_context(|| TimeIndexNotFoundSnafu {
3108 table: right_qualifier_string.clone(),
3109 })?;
3110 let left_field_col = left_context.field_columns.first().unwrap();
3112 let right_field_col = right_context.field_columns.first().unwrap();
3113
3114 let mut all_columns_set = left
3116 .schema()
3117 .fields()
3118 .iter()
3119 .chain(right.schema().fields().iter())
3120 .map(|field| field.name().clone())
3121 .collect::<HashSet<_>>();
3122 all_columns_set.remove(&left_time_index_column);
3124 all_columns_set.remove(&right_time_index_column);
3125 if left_field_col != right_field_col {
3127 all_columns_set.remove(right_field_col);
3128 }
3129 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3130 all_columns.sort_unstable();
3132 all_columns.insert(0, left_time_index_column.clone());
3134
3135 let left_proj_exprs = all_columns.iter().map(|col| {
3137 if tags_not_in_left.contains(col) {
3138 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3139 } else {
3140 DfExpr::Column(Column::new(None::<String>, col))
3141 }
3142 });
3143 let right_time_index_expr = DfExpr::Column(Column::new(
3144 right_qualifier.clone(),
3145 right_time_index_column,
3146 ))
3147 .alias(left_time_index_column.clone());
3148 let right_qualifier_for_field = right
3151 .schema()
3152 .iter()
3153 .find(|(_, f)| f.name() == right_field_col)
3154 .map(|(q, _)| q)
3155 .context(ColumnNotFoundSnafu {
3156 col: right_field_col.to_string(),
3157 })?
3158 .cloned();
3159
3160 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3162 if col == left_field_col && left_field_col != right_field_col {
3164 DfExpr::Column(Column::new(
3166 right_qualifier_for_field.clone(),
3167 right_field_col,
3168 ))
3169 } else if tags_not_in_right.contains(col) {
3170 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3171 } else {
3172 DfExpr::Column(Column::new(None::<String>, col))
3173 }
3174 });
3175 let right_proj_exprs = [right_time_index_expr]
3176 .into_iter()
3177 .chain(right_proj_exprs_without_time_index);
3178
3179 let left_projected = LogicalPlanBuilder::from(left)
3180 .project(left_proj_exprs)
3181 .context(DataFusionPlanningSnafu)?
3182 .alias(left_qualifier_string.clone())
3183 .context(DataFusionPlanningSnafu)?
3184 .build()
3185 .context(DataFusionPlanningSnafu)?;
3186 let right_projected = LogicalPlanBuilder::from(right)
3187 .project(right_proj_exprs)
3188 .context(DataFusionPlanningSnafu)?
3189 .alias(right_qualifier_string.clone())
3190 .context(DataFusionPlanningSnafu)?
3191 .build()
3192 .context(DataFusionPlanningSnafu)?;
3193
3194 let mut match_columns = if let Some(modifier) = modifier
3196 && let Some(matching) = &modifier.matching
3197 {
3198 match matching {
3199 LabelModifier::Include(on) => on.labels.clone(),
3201 LabelModifier::Exclude(ignoring) => {
3203 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3204 all_tags.difference(&ignoring).cloned().collect()
3205 }
3206 }
3207 } else {
3208 all_tags.iter().cloned().collect()
3209 };
3210 match_columns.sort_unstable();
3212 let schema = left_projected.schema().clone();
3214 let union_distinct_on = UnionDistinctOn::new(
3215 left_projected,
3216 right_projected,
3217 match_columns,
3218 left_time_index_column.clone(),
3219 schema,
3220 );
3221 let result = LogicalPlan::Extension(Extension {
3222 node: Arc::new(union_distinct_on),
3223 });
3224
3225 self.ctx.time_index_column = Some(left_time_index_column);
3227 self.ctx.tag_columns = all_tags.into_iter().collect();
3228 self.ctx.field_columns = vec![left_field_col.to_string()];
3229
3230 Ok(result)
3231 }
3232
3233 fn projection_for_each_field_column<F>(
3241 &mut self,
3242 input: LogicalPlan,
3243 name_to_expr: F,
3244 ) -> Result<LogicalPlan>
3245 where
3246 F: FnMut(&String) -> Result<DfExpr>,
3247 {
3248 let non_field_columns_iter = self
3249 .ctx
3250 .tag_columns
3251 .iter()
3252 .chain(self.ctx.time_index_column.iter())
3253 .map(|col| {
3254 Ok(DfExpr::Column(Column::new(
3255 self.ctx.table_name.clone().map(TableReference::bare),
3256 col,
3257 )))
3258 });
3259
3260 let result_field_columns = self
3262 .ctx
3263 .field_columns
3264 .iter()
3265 .map(name_to_expr)
3266 .collect::<Result<Vec<_>>>()?;
3267
3268 self.ctx.field_columns = result_field_columns
3270 .iter()
3271 .map(|expr| expr.schema_name().to_string())
3272 .collect();
3273 let field_columns_iter = result_field_columns
3274 .into_iter()
3275 .zip(self.ctx.field_columns.iter())
3276 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3277
3278 let project_fields = non_field_columns_iter
3280 .chain(field_columns_iter)
3281 .collect::<Result<Vec<_>>>()?;
3282
3283 LogicalPlanBuilder::from(input)
3284 .project(project_fields)
3285 .context(DataFusionPlanningSnafu)?
3286 .build()
3287 .context(DataFusionPlanningSnafu)
3288 }
3289
3290 fn filter_on_field_column<F>(
3293 &self,
3294 input: LogicalPlan,
3295 mut name_to_expr: F,
3296 ) -> Result<LogicalPlan>
3297 where
3298 F: FnMut(&String) -> Result<DfExpr>,
3299 {
3300 ensure!(
3301 self.ctx.field_columns.len() == 1,
3302 UnsupportedExprSnafu {
3303 name: "filter on multi-value input"
3304 }
3305 );
3306
3307 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3308
3309 LogicalPlanBuilder::from(input)
3310 .filter(field_column_filter)
3311 .context(DataFusionPlanningSnafu)?
3312 .build()
3313 .context(DataFusionPlanningSnafu)
3314 }
3315
3316 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3319 let input_expr = datafusion::logical_expr::col(
3320 self.ctx
3321 .time_index_column
3322 .as_ref()
3323 .with_context(|| TimeIndexNotFoundSnafu {
3325 table: "<doesn't matter>",
3326 })?,
3327 );
3328 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3329 func: datafusion_functions::datetime::date_part(),
3330 args: vec![date_part.lit(), input_expr],
3331 });
3332 Ok(fn_expr)
3333 }
3334}
3335
3336#[derive(Default, Debug)]
3337struct FunctionArgs {
3338 input: Option<PromExpr>,
3339 literals: Vec<DfExpr>,
3340}
3341
3342#[derive(Debug, Clone)]
3345enum ScalarFunc {
3346 DataFusionBuiltin(Arc<ScalarUdfDef>),
3350 DataFusionUdf(Arc<ScalarUdfDef>),
3354 Udf(Arc<ScalarUdfDef>),
3359 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3366 GeneratedExpr,
3370}
3371
3372#[cfg(test)]
3373mod test {
3374 use std::time::{Duration, UNIX_EPOCH};
3375
3376 use catalog::memory::{new_memory_catalog_manager, MemoryCatalogManager};
3377 use catalog::RegisterTableRequest;
3378 use common_base::Plugins;
3379 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3380 use common_query::test_util::DummyDecoder;
3381 use datatypes::prelude::ConcreteDataType;
3382 use datatypes::schema::{ColumnSchema, Schema};
3383 use promql_parser::label::Labels;
3384 use promql_parser::parser;
3385 use session::context::QueryContext;
3386 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3387 use table::test_util::EmptyTable;
3388
3389 use super::*;
3390 use crate::options::QueryOptions;
3391
3392 fn build_query_engine_state() -> QueryEngineState {
3393 QueryEngineState::new(
3394 new_memory_catalog_manager().unwrap(),
3395 None,
3396 None,
3397 None,
3398 None,
3399 None,
3400 false,
3401 Plugins::default(),
3402 QueryOptions::default(),
3403 )
3404 }
3405
3406 async fn build_test_table_provider(
3407 table_name_tuples: &[(String, String)],
3408 num_tag: usize,
3409 num_field: usize,
3410 ) -> DfTableSourceProvider {
3411 let catalog_list = MemoryCatalogManager::with_default_setup();
3412 for (schema_name, table_name) in table_name_tuples {
3413 let mut columns = vec![];
3414 for i in 0..num_tag {
3415 columns.push(ColumnSchema::new(
3416 format!("tag_{i}"),
3417 ConcreteDataType::string_datatype(),
3418 false,
3419 ));
3420 }
3421 columns.push(
3422 ColumnSchema::new(
3423 "timestamp".to_string(),
3424 ConcreteDataType::timestamp_millisecond_datatype(),
3425 false,
3426 )
3427 .with_time_index(true),
3428 );
3429 for i in 0..num_field {
3430 columns.push(ColumnSchema::new(
3431 format!("field_{i}"),
3432 ConcreteDataType::float64_datatype(),
3433 true,
3434 ));
3435 }
3436 let schema = Arc::new(Schema::new(columns));
3437 let table_meta = TableMetaBuilder::empty()
3438 .schema(schema)
3439 .primary_key_indices((0..num_tag).collect())
3440 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3441 .next_column_id(1024)
3442 .build()
3443 .unwrap();
3444 let table_info = TableInfoBuilder::default()
3445 .name(table_name.to_string())
3446 .meta(table_meta)
3447 .build()
3448 .unwrap();
3449 let table = EmptyTable::from_table_info(&table_info);
3450
3451 assert!(catalog_list
3452 .register_table_sync(RegisterTableRequest {
3453 catalog: DEFAULT_CATALOG_NAME.to_string(),
3454 schema: schema_name.to_string(),
3455 table_name: table_name.to_string(),
3456 table_id: 1024,
3457 table,
3458 })
3459 .is_ok());
3460 }
3461
3462 DfTableSourceProvider::new(
3463 catalog_list,
3464 false,
3465 QueryContext::arc(),
3466 DummyDecoder::arc(),
3467 false,
3468 )
3469 }
3470
3471 async fn build_test_table_provider_with_fields(
3472 table_name_tuples: &[(String, String)],
3473 tags: &[&str],
3474 ) -> DfTableSourceProvider {
3475 let catalog_list = MemoryCatalogManager::with_default_setup();
3476 for (schema_name, table_name) in table_name_tuples {
3477 let mut columns = vec![];
3478 let num_tag = tags.len();
3479 for tag in tags {
3480 columns.push(ColumnSchema::new(
3481 tag.to_string(),
3482 ConcreteDataType::string_datatype(),
3483 false,
3484 ));
3485 }
3486 columns.push(
3487 ColumnSchema::new(
3488 "greptime_timestamp".to_string(),
3489 ConcreteDataType::timestamp_millisecond_datatype(),
3490 false,
3491 )
3492 .with_time_index(true),
3493 );
3494 columns.push(ColumnSchema::new(
3495 "greptime_value".to_string(),
3496 ConcreteDataType::float64_datatype(),
3497 true,
3498 ));
3499 let schema = Arc::new(Schema::new(columns));
3500 let table_meta = TableMetaBuilder::empty()
3501 .schema(schema)
3502 .primary_key_indices((0..num_tag).collect())
3503 .next_column_id(1024)
3504 .build()
3505 .unwrap();
3506 let table_info = TableInfoBuilder::default()
3507 .name(table_name.to_string())
3508 .meta(table_meta)
3509 .build()
3510 .unwrap();
3511 let table = EmptyTable::from_table_info(&table_info);
3512
3513 assert!(catalog_list
3514 .register_table_sync(RegisterTableRequest {
3515 catalog: DEFAULT_CATALOG_NAME.to_string(),
3516 schema: schema_name.to_string(),
3517 table_name: table_name.to_string(),
3518 table_id: 1024,
3519 table,
3520 })
3521 .is_ok());
3522 }
3523
3524 DfTableSourceProvider::new(
3525 catalog_list,
3526 false,
3527 QueryContext::arc(),
3528 DummyDecoder::arc(),
3529 false,
3530 )
3531 }
3532
3533 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3549 let prom_expr =
3550 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3551 let eval_stmt = EvalStmt {
3552 expr: prom_expr,
3553 start: UNIX_EPOCH,
3554 end: UNIX_EPOCH
3555 .checked_add(Duration::from_secs(100_000))
3556 .unwrap(),
3557 interval: Duration::from_secs(5),
3558 lookback_delta: Duration::from_secs(1),
3559 };
3560
3561 let table_provider = build_test_table_provider(
3562 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3563 1,
3564 1,
3565 )
3566 .await;
3567 let plan =
3568 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3569 .await
3570 .unwrap();
3571
3572 let expected = String::from(
3573 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3574 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3575 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3576 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3577 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3578 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3579 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3580 ).replace("TEMPLATE", plan_name);
3581
3582 assert_eq!(plan.display_indent_schema().to_string(), expected);
3583 }
3584
3585 #[tokio::test]
3586 async fn single_abs() {
3587 do_single_instant_function_call("abs", "abs").await;
3588 }
3589
3590 #[tokio::test]
3591 #[should_panic]
3592 async fn single_absent() {
3593 do_single_instant_function_call("absent", "").await;
3594 }
3595
3596 #[tokio::test]
3597 async fn single_ceil() {
3598 do_single_instant_function_call("ceil", "ceil").await;
3599 }
3600
3601 #[tokio::test]
3602 async fn single_exp() {
3603 do_single_instant_function_call("exp", "exp").await;
3604 }
3605
3606 #[tokio::test]
3607 async fn single_ln() {
3608 do_single_instant_function_call("ln", "ln").await;
3609 }
3610
3611 #[tokio::test]
3612 async fn single_log2() {
3613 do_single_instant_function_call("log2", "log2").await;
3614 }
3615
3616 #[tokio::test]
3617 async fn single_log10() {
3618 do_single_instant_function_call("log10", "log10").await;
3619 }
3620
3621 #[tokio::test]
3622 #[should_panic]
3623 async fn single_scalar() {
3624 do_single_instant_function_call("scalar", "").await;
3625 }
3626
3627 #[tokio::test]
3628 #[should_panic]
3629 async fn single_sgn() {
3630 do_single_instant_function_call("sgn", "").await;
3631 }
3632
3633 #[tokio::test]
3634 #[should_panic]
3635 async fn single_sort() {
3636 do_single_instant_function_call("sort", "").await;
3637 }
3638
3639 #[tokio::test]
3640 #[should_panic]
3641 async fn single_sort_desc() {
3642 do_single_instant_function_call("sort_desc", "").await;
3643 }
3644
3645 #[tokio::test]
3646 async fn single_sqrt() {
3647 do_single_instant_function_call("sqrt", "sqrt").await;
3648 }
3649
3650 #[tokio::test]
3651 #[should_panic]
3652 async fn single_timestamp() {
3653 do_single_instant_function_call("timestamp", "").await;
3654 }
3655
3656 #[tokio::test]
3657 async fn single_acos() {
3658 do_single_instant_function_call("acos", "acos").await;
3659 }
3660
3661 #[tokio::test]
3662 #[should_panic]
3663 async fn single_acosh() {
3664 do_single_instant_function_call("acosh", "").await;
3665 }
3666
3667 #[tokio::test]
3668 async fn single_asin() {
3669 do_single_instant_function_call("asin", "asin").await;
3670 }
3671
3672 #[tokio::test]
3673 #[should_panic]
3674 async fn single_asinh() {
3675 do_single_instant_function_call("asinh", "").await;
3676 }
3677
3678 #[tokio::test]
3679 async fn single_atan() {
3680 do_single_instant_function_call("atan", "atan").await;
3681 }
3682
3683 #[tokio::test]
3684 #[should_panic]
3685 async fn single_atanh() {
3686 do_single_instant_function_call("atanh", "").await;
3687 }
3688
3689 #[tokio::test]
3690 async fn single_cos() {
3691 do_single_instant_function_call("cos", "cos").await;
3692 }
3693
3694 #[tokio::test]
3695 #[should_panic]
3696 async fn single_cosh() {
3697 do_single_instant_function_call("cosh", "").await;
3698 }
3699
3700 #[tokio::test]
3701 async fn single_sin() {
3702 do_single_instant_function_call("sin", "sin").await;
3703 }
3704
3705 #[tokio::test]
3706 #[should_panic]
3707 async fn single_sinh() {
3708 do_single_instant_function_call("sinh", "").await;
3709 }
3710
3711 #[tokio::test]
3712 async fn single_tan() {
3713 do_single_instant_function_call("tan", "tan").await;
3714 }
3715
3716 #[tokio::test]
3717 #[should_panic]
3718 async fn single_tanh() {
3719 do_single_instant_function_call("tanh", "").await;
3720 }
3721
3722 #[tokio::test]
3723 #[should_panic]
3724 async fn single_deg() {
3725 do_single_instant_function_call("deg", "").await;
3726 }
3727
3728 #[tokio::test]
3729 #[should_panic]
3730 async fn single_rad() {
3731 do_single_instant_function_call("rad", "").await;
3732 }
3733
3734 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3756 let prom_expr = parser::parse(&format!(
3757 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3758 ))
3759 .unwrap();
3760 let mut eval_stmt = EvalStmt {
3761 expr: prom_expr,
3762 start: UNIX_EPOCH,
3763 end: UNIX_EPOCH
3764 .checked_add(Duration::from_secs(100_000))
3765 .unwrap(),
3766 interval: Duration::from_secs(5),
3767 lookback_delta: Duration::from_secs(1),
3768 };
3769
3770 let table_provider = build_test_table_provider(
3772 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3773 2,
3774 2,
3775 )
3776 .await;
3777 let plan =
3778 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3779 .await
3780 .unwrap();
3781 let expected_no_without = String::from(
3782 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3783 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3784 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3785 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3786 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3787 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3788 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3789 ).replace("TEMPLATE", plan_name);
3790 assert_eq!(
3791 plan.display_indent_schema().to_string(),
3792 expected_no_without
3793 );
3794
3795 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3797 *modifier = Some(LabelModifier::Exclude(Labels {
3798 labels: vec![String::from("tag_1")].into_iter().collect(),
3799 }));
3800 }
3801 let table_provider = build_test_table_provider(
3802 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3803 2,
3804 2,
3805 )
3806 .await;
3807 let plan =
3808 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3809 .await
3810 .unwrap();
3811 let expected_without = String::from(
3812 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3813 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3814 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3815 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3816 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3817 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3818 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3819 ).replace("TEMPLATE", plan_name);
3820 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3821 }
3822
3823 #[tokio::test]
3824 async fn aggregate_sum() {
3825 do_aggregate_expr_plan("sum", "sum").await;
3826 }
3827
3828 #[tokio::test]
3829 async fn aggregate_avg() {
3830 do_aggregate_expr_plan("avg", "avg").await;
3831 }
3832
3833 #[tokio::test]
3834 #[should_panic] async fn aggregate_count() {
3836 do_aggregate_expr_plan("count", "count").await;
3837 }
3838
3839 #[tokio::test]
3840 async fn aggregate_min() {
3841 do_aggregate_expr_plan("min", "min").await;
3842 }
3843
3844 #[tokio::test]
3845 async fn aggregate_max() {
3846 do_aggregate_expr_plan("max", "max").await;
3847 }
3848
3849 #[tokio::test]
3850 #[should_panic] async fn aggregate_group() {
3852 do_aggregate_expr_plan("grouping", "GROUPING").await;
3853 }
3854
3855 #[tokio::test]
3856 async fn aggregate_stddev() {
3857 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3858 }
3859
3860 #[tokio::test]
3861 async fn aggregate_stdvar() {
3862 do_aggregate_expr_plan("stdvar", "var_pop").await;
3863 }
3864
3865 #[tokio::test]
3889 async fn binary_op_column_column() {
3890 let prom_expr =
3891 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3892 let eval_stmt = EvalStmt {
3893 expr: prom_expr,
3894 start: UNIX_EPOCH,
3895 end: UNIX_EPOCH
3896 .checked_add(Duration::from_secs(100_000))
3897 .unwrap(),
3898 interval: Duration::from_secs(5),
3899 lookback_delta: Duration::from_secs(1),
3900 };
3901
3902 let table_provider = build_test_table_provider(
3903 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3904 1,
3905 1,
3906 )
3907 .await;
3908 let plan =
3909 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3910 .await
3911 .unwrap();
3912
3913 let expected = String::from(
3914 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3915 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3916 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3917 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3918 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3919 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3920 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3921 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3922 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3923 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3924 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3925 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3926 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3927 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3928 );
3929
3930 assert_eq!(plan.display_indent_schema().to_string(), expected);
3931 }
3932
3933 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3934 let prom_expr = parser::parse(query).unwrap();
3935 let eval_stmt = EvalStmt {
3936 expr: prom_expr,
3937 start: UNIX_EPOCH,
3938 end: UNIX_EPOCH
3939 .checked_add(Duration::from_secs(100_000))
3940 .unwrap(),
3941 interval: Duration::from_secs(5),
3942 lookback_delta: Duration::from_secs(1),
3943 };
3944
3945 let table_provider = build_test_table_provider(
3946 &[
3947 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3948 (
3949 "greptime_private".to_string(),
3950 "some_alt_metric".to_string(),
3951 ),
3952 ],
3953 1,
3954 1,
3955 )
3956 .await;
3957 let plan =
3958 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3959 .await
3960 .unwrap();
3961
3962 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
3963 }
3964
3965 #[tokio::test]
3966 async fn binary_op_literal_column() {
3967 let query = r#"1 + some_metric{tag_0="bar"}"#;
3968 let expected = String::from(
3969 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3970 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3971 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3972 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3973 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3974 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3975 );
3976
3977 indie_query_plan_compare(query, expected).await;
3978 }
3979
3980 #[tokio::test]
3981 async fn binary_op_literal_literal() {
3982 let query = r#"1 + 1"#;
3983 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
3984 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
3985 indie_query_plan_compare(query, expected).await;
3986 }
3987
3988 #[tokio::test]
3989 async fn simple_bool_grammar() {
3990 let query = "some_metric != bool 1.2345";
3991 let expected = String::from(
3992 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3993 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3994 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3995 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3996 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3997 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3998 );
3999
4000 indie_query_plan_compare(query, expected).await;
4001 }
4002
4003 #[tokio::test]
4004 async fn bool_with_additional_arithmetic() {
4005 let query = "some_metric + (1 == bool 2)";
4006 let expected = String::from(
4007 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4008 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4009 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4010 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4011 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4012 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4013 );
4014
4015 indie_query_plan_compare(query, expected).await;
4016 }
4017
4018 #[tokio::test]
4019 async fn simple_unary() {
4020 let query = "-some_metric";
4021 let expected = String::from(
4022 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4023 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4024 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4025 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4026 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4027 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4028 );
4029
4030 indie_query_plan_compare(query, expected).await;
4031 }
4032
4033 #[tokio::test]
4034 async fn increase_aggr() {
4035 let query = "increase(some_metric[5m])";
4036 let expected = String::from(
4037 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4038 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4039 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4040 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4041 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4042 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4043 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4044 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4045 );
4046
4047 indie_query_plan_compare(query, expected).await;
4048 }
4049
4050 #[tokio::test]
4051 async fn less_filter_on_value() {
4052 let query = "some_metric < 1.2345";
4053 let expected = String::from(
4054 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4059 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4060 );
4061
4062 indie_query_plan_compare(query, expected).await;
4063 }
4064
4065 #[tokio::test]
4066 async fn count_over_time() {
4067 let query = "count_over_time(some_metric[5m])";
4068 let expected = String::from(
4069 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4070 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4071 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4072 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4073 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4074 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4075 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4076 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4077 );
4078
4079 indie_query_plan_compare(query, expected).await;
4080 }
4081
4082 #[tokio::test]
4083 async fn test_hash_join() {
4084 let mut eval_stmt = EvalStmt {
4085 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4086 start: UNIX_EPOCH,
4087 end: UNIX_EPOCH
4088 .checked_add(Duration::from_secs(100_000))
4089 .unwrap(),
4090 interval: Duration::from_secs(5),
4091 lookback_delta: Duration::from_secs(1),
4092 };
4093
4094 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4095
4096 let prom_expr = parser::parse(case).unwrap();
4097 eval_stmt.expr = prom_expr;
4098 let table_provider = build_test_table_provider_with_fields(
4099 &[
4100 (
4101 DEFAULT_SCHEMA_NAME.to_string(),
4102 "http_server_requests_seconds_sum".to_string(),
4103 ),
4104 (
4105 DEFAULT_SCHEMA_NAME.to_string(),
4106 "http_server_requests_seconds_count".to_string(),
4107 ),
4108 ],
4109 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4110 )
4111 .await;
4112 let plan =
4114 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4115 .await
4116 .unwrap();
4117 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4118 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4119 \n SubqueryAlias: http_server_requests_seconds_sum\
4120 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4121 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4122 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4123 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4124 \n TableScan: http_server_requests_seconds_sum\
4125 \n SubqueryAlias: http_server_requests_seconds_count\
4126 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4127 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4128 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4129 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4130 \n TableScan: http_server_requests_seconds_count";
4131 assert_eq!(plan.to_string(), expected);
4132 }
4133
4134 #[tokio::test]
4135 async fn test_nested_histogram_quantile() {
4136 let mut eval_stmt = EvalStmt {
4137 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4138 start: UNIX_EPOCH,
4139 end: UNIX_EPOCH
4140 .checked_add(Duration::from_secs(100_000))
4141 .unwrap(),
4142 interval: Duration::from_secs(5),
4143 lookback_delta: Duration::from_secs(1),
4144 };
4145
4146 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4147
4148 let prom_expr = parser::parse(case).unwrap();
4149 eval_stmt.expr = prom_expr;
4150 let table_provider = build_test_table_provider_with_fields(
4151 &[(
4152 DEFAULT_SCHEMA_NAME.to_string(),
4153 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4154 )],
4155 &["pod", "le", "path", "code", "container"],
4156 )
4157 .await;
4158 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4160 .await
4161 .unwrap();
4162 }
4163
4164 #[tokio::test]
4165 async fn test_parse_and_operator() {
4166 let mut eval_stmt = EvalStmt {
4167 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4168 start: UNIX_EPOCH,
4169 end: UNIX_EPOCH
4170 .checked_add(Duration::from_secs(100_000))
4171 .unwrap(),
4172 interval: Duration::from_secs(5),
4173 lookback_delta: Duration::from_secs(1),
4174 };
4175
4176 let cases = [
4177 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4178 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4179 ];
4180
4181 for case in cases {
4182 let prom_expr = parser::parse(case).unwrap();
4183 eval_stmt.expr = prom_expr;
4184 let table_provider = build_test_table_provider_with_fields(
4185 &[
4186 (
4187 DEFAULT_SCHEMA_NAME.to_string(),
4188 "kubelet_volume_stats_used_bytes".to_string(),
4189 ),
4190 (
4191 DEFAULT_SCHEMA_NAME.to_string(),
4192 "kubelet_volume_stats_capacity_bytes".to_string(),
4193 ),
4194 ],
4195 &["namespace", "persistentvolumeclaim"],
4196 )
4197 .await;
4198 let _ =
4200 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4201 .await
4202 .unwrap();
4203 }
4204 }
4205
4206 #[tokio::test]
4207 async fn test_nested_binary_op() {
4208 let mut eval_stmt = EvalStmt {
4209 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4210 start: UNIX_EPOCH,
4211 end: UNIX_EPOCH
4212 .checked_add(Duration::from_secs(100_000))
4213 .unwrap(),
4214 interval: Duration::from_secs(5),
4215 lookback_delta: Duration::from_secs(1),
4216 };
4217
4218 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4219 (
4220 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4221 or
4222 vector(0)
4223 )"#;
4224
4225 let prom_expr = parser::parse(case).unwrap();
4226 eval_stmt.expr = prom_expr;
4227 let table_provider = build_test_table_provider_with_fields(
4228 &[(
4229 DEFAULT_SCHEMA_NAME.to_string(),
4230 "nginx_ingress_controller_requests".to_string(),
4231 )],
4232 &["namespace", "job"],
4233 )
4234 .await;
4235 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4237 .await
4238 .unwrap();
4239 }
4240
4241 #[tokio::test]
4242 async fn test_parse_or_operator() {
4243 let mut eval_stmt = EvalStmt {
4244 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4245 start: UNIX_EPOCH,
4246 end: UNIX_EPOCH
4247 .checked_add(Duration::from_secs(100_000))
4248 .unwrap(),
4249 interval: Duration::from_secs(5),
4250 lookback_delta: Duration::from_secs(1),
4251 };
4252
4253 let case = r#"
4254 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4255 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4256 or
4257 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4258 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4259
4260 let table_provider = build_test_table_provider_with_fields(
4261 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4262 &["tenant_name", "cluster_name"],
4263 )
4264 .await;
4265 eval_stmt.expr = parser::parse(case).unwrap();
4266 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4267 .await
4268 .unwrap();
4269
4270 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4271 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4272 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4273 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4274 or
4275 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4276 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4277 or
4278 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4279 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4280 let table_provider = build_test_table_provider_with_fields(
4281 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4282 &["tenant_name", "cluster_name"],
4283 )
4284 .await;
4285 eval_stmt.expr = parser::parse(case).unwrap();
4286 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4287 .await
4288 .unwrap();
4289
4290 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4291 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4292 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4293 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4294 let table_provider = build_test_table_provider_with_fields(
4295 &[
4296 (
4297 DEFAULT_SCHEMA_NAME.to_string(),
4298 "background_waitevent_cnt".to_string(),
4299 ),
4300 (
4301 DEFAULT_SCHEMA_NAME.to_string(),
4302 "foreground_waitevent_cnt".to_string(),
4303 ),
4304 ],
4305 &["tenant_name", "cluster_name"],
4306 )
4307 .await;
4308 eval_stmt.expr = parser::parse(case).unwrap();
4309 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4310 .await
4311 .unwrap();
4312
4313 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4314 let table_provider = build_test_table_provider_with_fields(
4315 &[
4316 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4317 (
4318 DEFAULT_SCHEMA_NAME.to_string(),
4319 "container_cpu_load_average_10s".to_string(),
4320 ),
4321 (
4322 DEFAULT_SCHEMA_NAME.to_string(),
4323 "container_spec_cpu_quota".to_string(),
4324 ),
4325 ],
4326 &["cluster_name", "host_name"],
4327 )
4328 .await;
4329 eval_stmt.expr = parser::parse(case).unwrap();
4330 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4331 .await
4332 .unwrap();
4333 }
4334
4335 #[tokio::test]
4336 async fn value_matcher() {
4337 let mut eval_stmt = EvalStmt {
4339 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4340 start: UNIX_EPOCH,
4341 end: UNIX_EPOCH
4342 .checked_add(Duration::from_secs(100_000))
4343 .unwrap(),
4344 interval: Duration::from_secs(5),
4345 lookback_delta: Duration::from_secs(1),
4346 };
4347
4348 let cases = [
4349 (
4351 r#"some_metric{__field__="field_1"}"#,
4352 vec![
4353 "some_metric.field_1",
4354 "some_metric.tag_0",
4355 "some_metric.tag_1",
4356 "some_metric.tag_2",
4357 "some_metric.timestamp",
4358 ],
4359 ),
4360 (
4362 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4363 vec![
4364 "some_metric.field_0",
4365 "some_metric.field_1",
4366 "some_metric.tag_0",
4367 "some_metric.tag_1",
4368 "some_metric.tag_2",
4369 "some_metric.timestamp",
4370 ],
4371 ),
4372 (
4374 r#"some_metric{__field__!="field_1"}"#,
4375 vec![
4376 "some_metric.field_0",
4377 "some_metric.field_2",
4378 "some_metric.tag_0",
4379 "some_metric.tag_1",
4380 "some_metric.tag_2",
4381 "some_metric.timestamp",
4382 ],
4383 ),
4384 (
4386 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4387 vec![
4388 "some_metric.field_0",
4389 "some_metric.tag_0",
4390 "some_metric.tag_1",
4391 "some_metric.tag_2",
4392 "some_metric.timestamp",
4393 ],
4394 ),
4395 (
4397 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4398 vec![
4399 "some_metric.field_1",
4400 "some_metric.tag_0",
4401 "some_metric.tag_1",
4402 "some_metric.tag_2",
4403 "some_metric.timestamp",
4404 ],
4405 ),
4406 (
4408 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4409 vec![
4410 "some_metric.tag_0",
4411 "some_metric.tag_1",
4412 "some_metric.tag_2",
4413 "some_metric.timestamp",
4414 ],
4415 ),
4416 (
4418 r#"some_metric{__field__=~"field_1|field_2"}"#,
4419 vec![
4420 "some_metric.field_1",
4421 "some_metric.field_2",
4422 "some_metric.tag_0",
4423 "some_metric.tag_1",
4424 "some_metric.tag_2",
4425 "some_metric.timestamp",
4426 ],
4427 ),
4428 (
4430 r#"some_metric{__field__!~"field_1|field_2"}"#,
4431 vec![
4432 "some_metric.field_0",
4433 "some_metric.tag_0",
4434 "some_metric.tag_1",
4435 "some_metric.tag_2",
4436 "some_metric.timestamp",
4437 ],
4438 ),
4439 ];
4440
4441 for case in cases {
4442 let prom_expr = parser::parse(case.0).unwrap();
4443 eval_stmt.expr = prom_expr;
4444 let table_provider = build_test_table_provider(
4445 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4446 3,
4447 3,
4448 )
4449 .await;
4450 let plan =
4451 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4452 .await
4453 .unwrap();
4454 let mut fields = plan.schema().field_names();
4455 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4456 fields.sort();
4457 expected.sort();
4458 assert_eq!(fields, expected, "case: {:?}", case.0);
4459 }
4460
4461 let bad_cases = [
4462 r#"some_metric{__field__="nonexistent"}"#,
4463 r#"some_metric{__field__!="nonexistent"}"#,
4464 ];
4465
4466 for case in bad_cases {
4467 let prom_expr = parser::parse(case).unwrap();
4468 eval_stmt.expr = prom_expr;
4469 let table_provider = build_test_table_provider(
4470 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4471 3,
4472 3,
4473 )
4474 .await;
4475 let plan =
4476 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4477 .await;
4478 assert!(plan.is_err(), "case: {:?}", case);
4479 }
4480 }
4481
4482 #[tokio::test]
4483 async fn custom_schema() {
4484 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4485 let expected = String::from(
4486 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4487 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4488 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4489 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4490 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4491 );
4492
4493 indie_query_plan_compare(query, expected).await;
4494
4495 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4496 let expected = String::from(
4497 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4498 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4499 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4500 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4501 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4502 );
4503
4504 indie_query_plan_compare(query, expected).await;
4505
4506 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4507 let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4508 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4509 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4510 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4511 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4512 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4513 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4514 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4515 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4516 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4517 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4518 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4519 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4520 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4521
4522 indie_query_plan_compare(query, expected).await;
4523 }
4524
4525 #[tokio::test]
4526 async fn only_equals_is_supported_for_special_matcher() {
4527 let queries = &[
4528 "some_alt_metric{__schema__!=\"greptime_private\"}",
4529 "some_alt_metric{__schema__=~\"lalala\"}",
4530 "some_alt_metric{__database__!=\"greptime_private\"}",
4531 "some_alt_metric{__database__=~\"lalala\"}",
4532 ];
4533
4534 for query in queries {
4535 let prom_expr = parser::parse(query).unwrap();
4536 let eval_stmt = EvalStmt {
4537 expr: prom_expr,
4538 start: UNIX_EPOCH,
4539 end: UNIX_EPOCH
4540 .checked_add(Duration::from_secs(100_000))
4541 .unwrap(),
4542 interval: Duration::from_secs(5),
4543 lookback_delta: Duration::from_secs(1),
4544 };
4545
4546 let table_provider = build_test_table_provider(
4547 &[
4548 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4549 (
4550 "greptime_private".to_string(),
4551 "some_alt_metric".to_string(),
4552 ),
4553 ],
4554 1,
4555 1,
4556 )
4557 .await;
4558
4559 let plan =
4560 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4561 .await;
4562 assert!(plan.is_err(), "query: {:?}", query);
4563 }
4564 }
4565
4566 #[tokio::test]
4567 async fn test_non_ms_precision() {
4568 let catalog_list = MemoryCatalogManager::with_default_setup();
4569 let columns = vec![
4570 ColumnSchema::new(
4571 "tag".to_string(),
4572 ConcreteDataType::string_datatype(),
4573 false,
4574 ),
4575 ColumnSchema::new(
4576 "timestamp".to_string(),
4577 ConcreteDataType::timestamp_nanosecond_datatype(),
4578 false,
4579 )
4580 .with_time_index(true),
4581 ColumnSchema::new(
4582 "field".to_string(),
4583 ConcreteDataType::float64_datatype(),
4584 true,
4585 ),
4586 ];
4587 let schema = Arc::new(Schema::new(columns));
4588 let table_meta = TableMetaBuilder::empty()
4589 .schema(schema)
4590 .primary_key_indices(vec![0])
4591 .value_indices(vec![2])
4592 .next_column_id(1024)
4593 .build()
4594 .unwrap();
4595 let table_info = TableInfoBuilder::default()
4596 .name("metrics".to_string())
4597 .meta(table_meta)
4598 .build()
4599 .unwrap();
4600 let table = EmptyTable::from_table_info(&table_info);
4601 assert!(catalog_list
4602 .register_table_sync(RegisterTableRequest {
4603 catalog: DEFAULT_CATALOG_NAME.to_string(),
4604 schema: DEFAULT_SCHEMA_NAME.to_string(),
4605 table_name: "metrics".to_string(),
4606 table_id: 1024,
4607 table,
4608 })
4609 .is_ok());
4610
4611 let plan = PromPlanner::stmt_to_plan(
4612 DfTableSourceProvider::new(
4613 catalog_list.clone(),
4614 false,
4615 QueryContext::arc(),
4616 DummyDecoder::arc(),
4617 true,
4618 ),
4619 &EvalStmt {
4620 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4621 start: UNIX_EPOCH,
4622 end: UNIX_EPOCH
4623 .checked_add(Duration::from_secs(100_000))
4624 .unwrap(),
4625 interval: Duration::from_secs(5),
4626 lookback_delta: Duration::from_secs(1),
4627 },
4628 &build_query_engine_state(),
4629 )
4630 .await
4631 .unwrap();
4632 assert_eq!(plan.display_indent_schema().to_string(),
4633 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4634 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4635 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4636 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4637 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4638 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4639 );
4640 let plan = PromPlanner::stmt_to_plan(
4641 DfTableSourceProvider::new(
4642 catalog_list.clone(),
4643 false,
4644 QueryContext::arc(),
4645 DummyDecoder::arc(),
4646 true,
4647 ),
4648 &EvalStmt {
4649 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4650 start: UNIX_EPOCH,
4651 end: UNIX_EPOCH
4652 .checked_add(Duration::from_secs(100_000))
4653 .unwrap(),
4654 interval: Duration::from_secs(5),
4655 lookback_delta: Duration::from_secs(1),
4656 },
4657 &build_query_engine_state(),
4658 )
4659 .await
4660 .unwrap();
4661 assert_eq!(plan.display_indent_schema().to_string(),
4662 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4663 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4664 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4665 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4666 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4667 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4668 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4669 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4670 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4671 );
4672 }
4673
4674 #[tokio::test]
4675 async fn test_nonexistent_label() {
4676 let mut eval_stmt = EvalStmt {
4678 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4679 start: UNIX_EPOCH,
4680 end: UNIX_EPOCH
4681 .checked_add(Duration::from_secs(100_000))
4682 .unwrap(),
4683 interval: Duration::from_secs(5),
4684 lookback_delta: Duration::from_secs(1),
4685 };
4686
4687 let case = r#"some_metric{nonexistent="hi"}"#;
4688 let prom_expr = parser::parse(case).unwrap();
4689 eval_stmt.expr = prom_expr;
4690 let table_provider = build_test_table_provider(
4691 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4692 3,
4693 3,
4694 )
4695 .await;
4696 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4698 .await
4699 .unwrap();
4700 }
4701
4702 #[tokio::test]
4703 async fn test_label_join() {
4704 let prom_expr = parser::parse(
4705 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4706 )
4707 .unwrap();
4708 let eval_stmt = EvalStmt {
4709 expr: prom_expr,
4710 start: UNIX_EPOCH,
4711 end: UNIX_EPOCH
4712 .checked_add(Duration::from_secs(100_000))
4713 .unwrap(),
4714 interval: Duration::from_secs(5),
4715 lookback_delta: Duration::from_secs(1),
4716 };
4717
4718 let table_provider =
4719 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4720 .await;
4721 let plan =
4722 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4723 .await
4724 .unwrap();
4725
4726 let expected = r#"
4727Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4728 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4729 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4730 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4731 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4732 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4733 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4734
4735 let ret = plan.display_indent_schema().to_string();
4736 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4737 }
4738
4739 #[tokio::test]
4740 async fn test_label_replace() {
4741 let prom_expr = parser::parse(
4742 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4743 )
4744 .unwrap();
4745 let eval_stmt = EvalStmt {
4746 expr: prom_expr,
4747 start: UNIX_EPOCH,
4748 end: UNIX_EPOCH
4749 .checked_add(Duration::from_secs(100_000))
4750 .unwrap(),
4751 interval: Duration::from_secs(5),
4752 lookback_delta: Duration::from_secs(1),
4753 };
4754
4755 let table_provider =
4756 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4757 .await;
4758 let plan =
4759 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4760 .await
4761 .unwrap();
4762
4763 let expected = r#"
4764Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4765 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4766 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4767 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4768 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4769 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4770 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4771
4772 let ret = plan.display_indent_schema().to_string();
4773 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4774 }
4775
4776 #[tokio::test]
4777 async fn test_matchers_to_expr() {
4778 let mut eval_stmt = EvalStmt {
4779 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4780 start: UNIX_EPOCH,
4781 end: UNIX_EPOCH
4782 .checked_add(Duration::from_secs(100_000))
4783 .unwrap(),
4784 interval: Duration::from_secs(5),
4785 lookback_delta: Duration::from_secs(1),
4786 };
4787 let case =
4788 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4789
4790 let prom_expr = parser::parse(case).unwrap();
4791 eval_stmt.expr = prom_expr;
4792 let table_provider = build_test_table_provider(
4793 &[(
4794 DEFAULT_SCHEMA_NAME.to_string(),
4795 "prometheus_tsdb_head_series".to_string(),
4796 )],
4797 3,
4798 3,
4799 )
4800 .await;
4801 let plan =
4802 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4803 .await
4804 .unwrap();
4805 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4806 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4807 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4808 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4809 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4810 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4811 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4812 assert_eq!(plan.display_indent_schema().to_string(), expected);
4813 }
4814
4815 #[tokio::test]
4816 async fn test_topk_expr() {
4817 let mut eval_stmt = EvalStmt {
4818 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4819 start: UNIX_EPOCH,
4820 end: UNIX_EPOCH
4821 .checked_add(Duration::from_secs(100_000))
4822 .unwrap(),
4823 interval: Duration::from_secs(5),
4824 lookback_delta: Duration::from_secs(1),
4825 };
4826 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4827
4828 let prom_expr = parser::parse(case).unwrap();
4829 eval_stmt.expr = prom_expr;
4830 let table_provider = build_test_table_provider_with_fields(
4831 &[
4832 (
4833 DEFAULT_SCHEMA_NAME.to_string(),
4834 "prometheus_tsdb_head_series".to_string(),
4835 ),
4836 (
4837 DEFAULT_SCHEMA_NAME.to_string(),
4838 "http_server_requests_seconds_count".to_string(),
4839 ),
4840 ],
4841 &["ip"],
4842 )
4843 .await;
4844
4845 let plan =
4846 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4847 .await
4848 .unwrap();
4849 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4850 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4851 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4852 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4853 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4854 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4855 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4856 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4857 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4858 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4859 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4860
4861 assert_eq!(plan.display_indent_schema().to_string(), expected);
4862 }
4863
4864 #[tokio::test]
4865 async fn test_count_values_expr() {
4866 let mut eval_stmt = EvalStmt {
4867 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4868 start: UNIX_EPOCH,
4869 end: UNIX_EPOCH
4870 .checked_add(Duration::from_secs(100_000))
4871 .unwrap(),
4872 interval: Duration::from_secs(5),
4873 lookback_delta: Duration::from_secs(1),
4874 };
4875 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4876
4877 let prom_expr = parser::parse(case).unwrap();
4878 eval_stmt.expr = prom_expr;
4879 let table_provider = build_test_table_provider_with_fields(
4880 &[
4881 (
4882 DEFAULT_SCHEMA_NAME.to_string(),
4883 "prometheus_tsdb_head_series".to_string(),
4884 ),
4885 (
4886 DEFAULT_SCHEMA_NAME.to_string(),
4887 "http_server_requests_seconds_count".to_string(),
4888 ),
4889 ],
4890 &["ip"],
4891 )
4892 .await;
4893
4894 let plan =
4895 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4896 .await
4897 .unwrap();
4898 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4899 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4900 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4901 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4902 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4903 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4904 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4905 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4906 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4907
4908 assert_eq!(plan.display_indent_schema().to_string(), expected);
4909 }
4910
4911 #[tokio::test]
4912 async fn test_quantile_expr() {
4913 let mut eval_stmt = EvalStmt {
4914 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4915 start: UNIX_EPOCH,
4916 end: UNIX_EPOCH
4917 .checked_add(Duration::from_secs(100_000))
4918 .unwrap(),
4919 interval: Duration::from_secs(5),
4920 lookback_delta: Duration::from_secs(1),
4921 };
4922 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4923
4924 let prom_expr = parser::parse(case).unwrap();
4925 eval_stmt.expr = prom_expr;
4926 let table_provider = build_test_table_provider_with_fields(
4927 &[
4928 (
4929 DEFAULT_SCHEMA_NAME.to_string(),
4930 "prometheus_tsdb_head_series".to_string(),
4931 ),
4932 (
4933 DEFAULT_SCHEMA_NAME.to_string(),
4934 "http_server_requests_seconds_count".to_string(),
4935 ),
4936 ],
4937 &["ip"],
4938 )
4939 .await;
4940
4941 let plan =
4942 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4943 .await
4944 .unwrap();
4945 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4946 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4947 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4948 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4949 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4950 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4951 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4952 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4953 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4954
4955 assert_eq!(plan.display_indent_schema().to_string(), expected);
4956 }
4957
4958 #[tokio::test]
4959 async fn test_or_not_exists_table_label() {
4960 let mut eval_stmt = EvalStmt {
4961 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4962 start: UNIX_EPOCH,
4963 end: UNIX_EPOCH
4964 .checked_add(Duration::from_secs(100_000))
4965 .unwrap(),
4966 interval: Duration::from_secs(5),
4967 lookback_delta: Duration::from_secs(1),
4968 };
4969 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4970
4971 let prom_expr = parser::parse(case).unwrap();
4972 eval_stmt.expr = prom_expr;
4973 let table_provider = build_test_table_provider_with_fields(
4974 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4975 &["job"],
4976 )
4977 .await;
4978
4979 let plan =
4980 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4981 .await
4982 .unwrap();
4983 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4984 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4985 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4986 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4987 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4988 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4989 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4990 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4991 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4992 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4993 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
4994 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
4995 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
4996 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
4997 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4998 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4999
5000 assert_eq!(plan.display_indent_schema().to_string(), expected);
5001 }
5002
5003 #[tokio::test]
5004 async fn test_histogram_quantile_missing_le_column() {
5005 let mut eval_stmt = EvalStmt {
5006 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5007 start: UNIX_EPOCH,
5008 end: UNIX_EPOCH
5009 .checked_add(Duration::from_secs(100_000))
5010 .unwrap(),
5011 interval: Duration::from_secs(5),
5012 lookback_delta: Duration::from_secs(1),
5013 };
5014
5015 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5017
5018 let prom_expr = parser::parse(case).unwrap();
5019 eval_stmt.expr = prom_expr;
5020
5021 let table_provider = build_test_table_provider_with_fields(
5023 &[(
5024 DEFAULT_SCHEMA_NAME.to_string(),
5025 "non_existent_histogram_bucket".to_string(),
5026 )],
5027 &["pod", "instance"], )
5029 .await;
5030
5031 let result =
5033 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5034 .await;
5035
5036 assert!(
5038 result.is_ok(),
5039 "Expected successful plan creation with empty result, but got error: {:?}",
5040 result.err()
5041 );
5042
5043 let plan = result.unwrap();
5045 match plan {
5046 LogicalPlan::EmptyRelation(_) => {
5047 }
5049 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5050 }
5051 }
5052}