1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51 ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, DoubleExponentialSmoothing,
63 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65 quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74 VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80 METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85 ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86 EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96 ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100const SPECIAL_TIME_FUNCTION: &str = "time";
102const SCALAR_FUNCTION: &str = "scalar";
104const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110const LE_COLUMN_NAME: &str = "le";
112
113static LABEL_NAME_REGEX: Lazy<Regex> =
116 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120const DEFAULT_FIELD_COLUMN: &str = "value";
122
123const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130const MAX_SCATTER_POINTS: i64 = 400;
132
133const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138 start: Millisecond,
140 end: Millisecond,
141 interval: Millisecond,
142 lookback_delta: Millisecond,
143
144 table_name: Option<String>,
146 time_index_column: Option<String>,
147 field_columns: Vec<String>,
148 tag_columns: Vec<String>,
149 use_tsid: bool,
155 field_column_matcher: Option<Vec<Matcher>>,
157 selector_matcher: Vec<Matcher>,
159 schema_name: Option<String>,
160 range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166 Self {
167 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169 interval: stmt.interval.as_millis() as _,
170 lookback_delta: stmt.lookback_delta.as_millis() as _,
171 ..Default::default()
172 }
173 }
174
175 fn reset(&mut self) {
177 self.table_name = None;
178 self.time_index_column = None;
179 self.field_columns = vec![];
180 self.tag_columns = vec![];
181 self.use_tsid = false;
182 self.field_column_matcher = None;
183 self.selector_matcher.clear();
184 self.schema_name = None;
185 self.range = None;
186 }
187
188 fn reset_table_name_and_schema(&mut self) {
190 self.table_name = Some(String::new());
191 self.schema_name = None;
192 self.use_tsid = false;
193 }
194
195 fn has_le_tag(&self) -> bool {
197 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198 }
199}
200
201pub struct PromPlanner {
202 table_provider: DfTableSourceProvider,
203 ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207 pub async fn stmt_to_plan(
208 table_provider: DfTableSourceProvider,
209 stmt: &EvalStmt,
210 query_engine_state: &QueryEngineState,
211 ) -> Result<LogicalPlan> {
212 let mut planner = Self {
213 table_provider,
214 ctx: PromPlannerContext::from_eval_stmt(stmt),
215 };
216
217 let plan = planner
218 .prom_expr_to_plan(&stmt.expr, query_engine_state)
219 .await?;
220
221 planner.strip_tsid_column(plan)
223 }
224
225 pub async fn prom_expr_to_plan(
226 &mut self,
227 prom_expr: &PromExpr,
228 query_engine_state: &QueryEngineState,
229 ) -> Result<LogicalPlan> {
230 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231 .await
232 }
233
234 #[async_recursion]
244 async fn prom_expr_to_plan_inner(
245 &mut self,
246 prom_expr: &PromExpr,
247 timestamp_fn: bool,
248 query_engine_state: &QueryEngineState,
249 ) -> Result<LogicalPlan> {
250 let res = match prom_expr {
251 PromExpr::Aggregate(expr) => {
252 self.prom_aggr_expr_to_plan(query_engine_state, expr)
253 .await?
254 }
255 PromExpr::Unary(expr) => {
256 self.prom_unary_expr_to_plan(query_engine_state, expr)
257 .await?
258 }
259 PromExpr::Binary(expr) => {
260 self.prom_binary_expr_to_plan(query_engine_state, expr)
261 .await?
262 }
263 PromExpr::Paren(ParenExpr { expr }) => {
264 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265 .await?
266 }
267 PromExpr::Subquery(expr) => {
268 self.prom_subquery_expr_to_plan(query_engine_state, expr)
269 .await?
270 }
271 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273 PromExpr::VectorSelector(selector) => {
274 self.prom_vector_selector_to_plan(selector, timestamp_fn)
275 .await?
276 }
277 PromExpr::MatrixSelector(selector) => {
278 self.prom_matrix_selector_to_plan(selector).await?
279 }
280 PromExpr::Call(expr) => {
281 self.prom_call_expr_to_plan(query_engine_state, expr)
282 .await?
283 }
284 PromExpr::Extension(expr) => {
285 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286 }
287 };
288
289 Ok(res)
290 }
291
292 async fn prom_subquery_expr_to_plan(
293 &mut self,
294 query_engine_state: &QueryEngineState,
295 subquery_expr: &SubqueryExpr,
296 ) -> Result<LogicalPlan> {
297 let SubqueryExpr {
298 expr, range, step, ..
299 } = subquery_expr;
300
301 let current_interval = self.ctx.interval;
302 if let Some(step) = step {
303 self.ctx.interval = step.as_millis() as _;
304 }
305 let current_start = self.ctx.start;
306 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308 self.ctx.interval = current_interval;
309 self.ctx.start = current_start;
310
311 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312 let range_ms = range.as_millis() as _;
313 self.ctx.range = Some(range_ms);
314
315 let manipulate = RangeManipulate::new(
316 self.ctx.start,
317 self.ctx.end,
318 self.ctx.interval,
319 range_ms,
320 self.ctx
321 .time_index_column
322 .clone()
323 .expect("time index should be set in `setup_context`"),
324 self.ctx.field_columns.clone(),
325 input,
326 )
327 .context(DataFusionPlanningSnafu)?;
328
329 Ok(LogicalPlan::Extension(Extension {
330 node: Arc::new(manipulate),
331 }))
332 }
333
334 async fn prom_aggr_expr_to_plan(
335 &mut self,
336 query_engine_state: &QueryEngineState,
337 aggr_expr: &AggregateExpr,
338 ) -> Result<LogicalPlan> {
339 let AggregateExpr {
340 op,
341 expr,
342 modifier,
343 param,
344 } = aggr_expr;
345
346 let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347 let input_has_tsid = input.schema().fields().iter().any(|field| {
348 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349 && field.data_type() == &ArrowDataType::UInt64
350 });
351
352 let required_group_tags = match modifier {
355 None => BTreeSet::new(),
356 Some(LabelModifier::Include(labels)) => labels
357 .labels
358 .iter()
359 .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360 .cloned()
361 .collect(),
362 Some(LabelModifier::Exclude(labels)) => {
363 let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364 for label in &labels.labels {
365 let _ = all_tags.remove(label);
366 }
367 all_tags
368 }
369 };
370
371 if !required_group_tags.is_empty()
372 && required_group_tags
373 .iter()
374 .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375 {
376 input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377 self.refresh_tag_columns_from_schema(input.schema());
378 }
379
380 match (*op).id() {
381 token::T_TOPK | token::T_BOTTOMK => {
382 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383 }
384 _ => {
385 let input_tag_columns = if input_has_tsid {
389 self.collect_row_key_tag_columns_from_plan(&input)?
390 .into_iter()
391 .collect::<Vec<_>>()
392 } else {
393 self.ctx.tag_columns.clone()
394 };
395 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398 let (mut aggr_exprs, prev_field_exprs) =
400 self.create_aggregate_exprs(*op, param, &input)?;
401
402 let keep_tsid = op.id() != token::T_COUNT_VALUES
403 && input_has_tsid
404 && input_tag_columns.iter().collect::<HashSet<_>>()
405 == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407 if keep_tsid {
408 aggr_exprs.push(
409 first_value(
410 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411 vec![],
412 )
413 .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414 );
415 }
416 self.ctx.use_tsid = keep_tsid;
417
418 let builder = LogicalPlanBuilder::from(input);
420 let builder = if op.id() == token::T_COUNT_VALUES {
421 let label = Self::get_param_value_as_str(*op, param)?;
422 group_exprs.extend(prev_field_exprs.clone());
425 let project_fields = self
426 .create_field_column_exprs()?
427 .into_iter()
428 .chain(self.create_tag_column_exprs()?)
429 .chain(Some(self.create_time_index_column_expr()?))
430 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432 builder
433 .aggregate(group_exprs.clone(), aggr_exprs)
434 .context(DataFusionPlanningSnafu)?
435 .project(project_fields)
436 .context(DataFusionPlanningSnafu)?
437 } else {
438 builder
439 .aggregate(group_exprs.clone(), aggr_exprs)
440 .context(DataFusionPlanningSnafu)?
441 };
442
443 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445 builder
446 .sort(sort_expr)
447 .context(DataFusionPlanningSnafu)?
448 .build()
449 .context(DataFusionPlanningSnafu)
450 }
451 }
452 }
453
454 async fn prom_topk_bottomk_to_plan(
456 &mut self,
457 aggr_expr: &AggregateExpr,
458 input: LogicalPlan,
459 ) -> Result<LogicalPlan> {
460 let AggregateExpr {
461 op,
462 param,
463 modifier,
464 ..
465 } = aggr_expr;
466
467 let input_has_tsid = input.schema().fields().iter().any(|field| {
468 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469 && field.data_type() == &ArrowDataType::UInt64
470 });
471 self.ctx.use_tsid = input_has_tsid;
472
473 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480 let rank_columns: Vec<_> = window_exprs
481 .iter()
482 .map(|expr| expr.schema_name().to_string())
483 .collect();
484
485 let filter: DfExpr = rank_columns
488 .iter()
489 .fold(None, |expr, rank| {
490 let predicate = DfExpr::BinaryExpr(BinaryExpr {
491 left: Box::new(col(rank)),
492 op: Operator::LtEq,
493 right: Box::new(val.clone()),
494 });
495
496 match expr {
497 None => Some(predicate),
498 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499 left: Box::new(expr),
500 op: Operator::Or,
501 right: Box::new(predicate),
502 })),
503 }
504 })
505 .unwrap();
506
507 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509 let mut new_group_exprs = group_exprs.clone();
510 new_group_exprs.extend(rank_columns);
512
513 let group_sort_expr = new_group_exprs
514 .into_iter()
515 .map(|expr| expr.sort(true, false));
516
517 let project_fields = self
518 .create_field_column_exprs()?
519 .into_iter()
520 .chain(self.create_tag_column_exprs()?)
521 .chain(
522 self.ctx
523 .use_tsid
524 .then_some(DfExpr::Column(Column::from_name(
525 DATA_SCHEMA_TSID_COLUMN_NAME,
526 ))),
527 )
528 .chain(Some(self.create_time_index_column_expr()?));
529
530 LogicalPlanBuilder::from(input)
531 .window(window_exprs)
532 .context(DataFusionPlanningSnafu)?
533 .filter(filter)
534 .context(DataFusionPlanningSnafu)?
535 .sort(group_sort_expr)
536 .context(DataFusionPlanningSnafu)?
537 .project(project_fields)
538 .context(DataFusionPlanningSnafu)?
539 .build()
540 .context(DataFusionPlanningSnafu)
541 }
542
543 async fn prom_unary_expr_to_plan(
544 &mut self,
545 query_engine_state: &QueryEngineState,
546 unary_expr: &UnaryExpr,
547 ) -> Result<LogicalPlan> {
548 let UnaryExpr { expr } = unary_expr;
549 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
551 self.projection_for_each_field_column(input, |col| {
552 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
553 })
554 }
555
556 async fn prom_binary_expr_to_plan(
557 &mut self,
558 query_engine_state: &QueryEngineState,
559 binary_expr: &PromBinaryExpr,
560 ) -> Result<LogicalPlan> {
561 let PromBinaryExpr {
562 lhs,
563 rhs,
564 op,
565 modifier,
566 } = binary_expr;
567
568 let should_return_bool = if let Some(m) = modifier {
571 m.return_bool
572 } else {
573 false
574 };
575 let is_comparison_op = Self::is_token_a_comparison_op(*op);
576
577 match (
580 Self::try_build_literal_expr(lhs),
581 Self::try_build_literal_expr(rhs),
582 ) {
583 (Some(lhs), Some(rhs)) => {
584 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
585 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
586 self.ctx.reset_table_name_and_schema();
587 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
588 let mut field_expr = field_expr_builder(lhs, rhs)?;
589
590 if is_comparison_op && should_return_bool {
591 field_expr = DfExpr::Cast(Cast {
592 expr: Box::new(field_expr),
593 data_type: ArrowDataType::Float64,
594 });
595 }
596
597 Ok(LogicalPlan::Extension(Extension {
598 node: Arc::new(
599 EmptyMetric::new(
600 self.ctx.start,
601 self.ctx.end,
602 self.ctx.interval,
603 SPECIAL_TIME_FUNCTION.to_string(),
604 DEFAULT_FIELD_COLUMN.to_string(),
605 Some(field_expr),
606 )
607 .context(DataFusionPlanningSnafu)?,
608 ),
609 }))
610 }
611 (Some(mut expr), None) => {
613 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
614 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
616 expr = time_expr
617 }
618 let bin_expr_builder = |col: &String| {
619 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
620 let mut binary_expr =
621 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
622
623 if is_comparison_op && should_return_bool {
624 binary_expr = DfExpr::Cast(Cast {
625 expr: Box::new(binary_expr),
626 data_type: ArrowDataType::Float64,
627 });
628 }
629 Ok(binary_expr)
630 };
631 if is_comparison_op && !should_return_bool {
632 self.filter_on_field_column(input, bin_expr_builder)
633 } else {
634 self.projection_for_each_field_column(input, bin_expr_builder)
635 }
636 }
637 (None, Some(mut expr)) => {
639 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
640 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
642 expr = time_expr
643 }
644 let bin_expr_builder = |col: &String| {
645 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
646 let mut binary_expr =
647 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
648
649 if is_comparison_op && should_return_bool {
650 binary_expr = DfExpr::Cast(Cast {
651 expr: Box::new(binary_expr),
652 data_type: ArrowDataType::Float64,
653 });
654 }
655 Ok(binary_expr)
656 };
657 if is_comparison_op && !should_return_bool {
658 self.filter_on_field_column(input, bin_expr_builder)
659 } else {
660 self.projection_for_each_field_column(input, bin_expr_builder)
661 }
662 }
663 (None, None) => {
665 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
666 let left_field_columns = self.ctx.field_columns.clone();
667 let left_time_index_column = self.ctx.time_index_column.clone();
668 let mut left_table_ref = self
669 .table_ref()
670 .unwrap_or_else(|_| TableReference::bare(""));
671 let left_context = self.ctx.clone();
672
673 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
674 let right_field_columns = self.ctx.field_columns.clone();
675 let right_time_index_column = self.ctx.time_index_column.clone();
676 let mut right_table_ref = self
677 .table_ref()
678 .unwrap_or_else(|_| TableReference::bare(""));
679 let right_context = self.ctx.clone();
680
681 if Self::is_token_a_set_op(*op) {
685 return self.set_op_on_non_field_columns(
686 left_input,
687 right_input,
688 left_context,
689 right_context,
690 *op,
691 modifier,
692 );
693 }
694
695 if left_table_ref == right_table_ref {
697 left_table_ref = TableReference::bare("lhs");
699 right_table_ref = TableReference::bare("rhs");
700 if self.ctx.tag_columns.is_empty() {
706 self.ctx = left_context.clone();
707 self.ctx.table_name = Some("lhs".to_string());
708 } else {
709 self.ctx.table_name = Some("rhs".to_string());
710 }
711 }
712 let (output_field_columns, field_columns) =
713 Self::align_binary_field_columns(&left_field_columns, &right_field_columns);
714 self.ctx.field_columns = output_field_columns;
718 let mut field_columns = field_columns.into_iter();
719
720 let join_plan = self.join_on_non_field_columns(
721 left_input,
722 right_input,
723 left_table_ref.clone(),
724 right_table_ref.clone(),
725 left_time_index_column,
726 right_time_index_column,
727 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
730 modifier,
731 )?;
732 let join_plan_schema = join_plan.schema().clone();
733
734 let bin_expr_builder = |_: &String| {
735 let (left_col_name, right_col_name) = field_columns.next().unwrap();
736 let left_col = join_plan_schema
737 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
738 .context(DataFusionPlanningSnafu)?
739 .into();
740 let right_col = join_plan_schema
741 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
742 .context(DataFusionPlanningSnafu)?
743 .into();
744
745 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
746 let mut binary_expr =
747 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
748 if is_comparison_op && should_return_bool {
749 binary_expr = DfExpr::Cast(Cast {
750 expr: Box::new(binary_expr),
751 data_type: ArrowDataType::Float64,
752 });
753 }
754 Ok(binary_expr)
755 };
756 if is_comparison_op && !should_return_bool {
757 let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
764 let (project_table_ref, project_context) =
765 match (lhs.value_type(), rhs.value_type()) {
766 (ValueType::Scalar, ValueType::Vector) => {
767 (&right_table_ref, &right_context)
768 }
769 _ => (&left_table_ref, &left_context),
770 };
771 self.project_binary_join_side(filtered, project_table_ref, project_context)
772 } else {
773 self.projection_for_each_field_column(join_plan, bin_expr_builder)
774 }
775 }
776 }
777 }
778
779 fn project_binary_join_side(
780 &mut self,
781 input: LogicalPlan,
782 table_ref: &TableReference,
783 context: &PromPlannerContext,
784 ) -> Result<LogicalPlan> {
785 let schema = input.schema();
786
787 let mut project_exprs =
788 Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
789
790 if let Some(time_index_column) = &context.time_index_column {
792 let time_index_col = schema
793 .qualified_field_with_name(Some(table_ref), time_index_column)
794 .context(DataFusionPlanningSnafu)?
795 .into();
796 project_exprs.push(DfExpr::Column(time_index_col));
797 }
798
799 for field_column in &context.field_columns {
801 let field_col = schema
802 .qualified_field_with_name(Some(table_ref), field_column)
803 .context(DataFusionPlanningSnafu)?
804 .into();
805 project_exprs.push(DfExpr::Column(field_col));
806 }
807
808 for tag_column in &context.tag_columns {
810 let tag_col = schema
811 .qualified_field_with_name(Some(table_ref), tag_column)
812 .context(DataFusionPlanningSnafu)?
813 .into();
814 project_exprs.push(DfExpr::Column(tag_col));
815 }
816
817 if let Some(tsid_col) =
820 Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid)
821 {
822 project_exprs.push(tsid_col);
823 }
824
825 let plan = LogicalPlanBuilder::from(input)
826 .project(project_exprs)
827 .context(DataFusionPlanningSnafu)?
828 .build()
829 .context(DataFusionPlanningSnafu)?;
830
831 self.ctx = context.clone();
834 self.ctx.table_name = None;
835 self.ctx.schema_name = None;
836
837 Ok(plan)
838 }
839
840 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
841 let NumberLiteral { val } = number_literal;
842 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
843 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
844 self.ctx.reset_table_name_and_schema();
845 let literal_expr = df_prelude::lit(*val);
846
847 let plan = LogicalPlan::Extension(Extension {
848 node: Arc::new(
849 EmptyMetric::new(
850 self.ctx.start,
851 self.ctx.end,
852 self.ctx.interval,
853 SPECIAL_TIME_FUNCTION.to_string(),
854 DEFAULT_FIELD_COLUMN.to_string(),
855 Some(literal_expr),
856 )
857 .context(DataFusionPlanningSnafu)?,
858 ),
859 });
860 Ok(plan)
861 }
862
863 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
864 let StringLiteral { val } = string_literal;
865 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
866 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
867 self.ctx.reset_table_name_and_schema();
868 let literal_expr = df_prelude::lit(val.clone());
869
870 let plan = LogicalPlan::Extension(Extension {
871 node: Arc::new(
872 EmptyMetric::new(
873 self.ctx.start,
874 self.ctx.end,
875 self.ctx.interval,
876 SPECIAL_TIME_FUNCTION.to_string(),
877 DEFAULT_FIELD_COLUMN.to_string(),
878 Some(literal_expr),
879 )
880 .context(DataFusionPlanningSnafu)?,
881 ),
882 });
883 Ok(plan)
884 }
885
886 async fn prom_vector_selector_to_plan(
887 &mut self,
888 vector_selector: &VectorSelector,
889 timestamp_fn: bool,
890 ) -> Result<LogicalPlan> {
891 let VectorSelector {
892 name,
893 offset,
894 matchers,
895 at: _,
896 } = vector_selector;
897 let matchers = self.preprocess_label_matchers(matchers, name)?;
898 if let Some(empty_plan) = self.setup_context().await? {
899 return Ok(empty_plan);
900 }
901 let normalize = self
902 .selector_to_series_normalize_plan(offset, matchers, false)
903 .await?;
904
905 let normalize = if timestamp_fn {
906 self.create_timestamp_func_plan(normalize)?
909 } else {
910 normalize
911 };
912
913 let manipulate = InstantManipulate::new(
914 self.ctx.start,
915 self.ctx.end,
916 self.ctx.lookback_delta,
917 self.ctx.interval,
918 self.ctx
919 .time_index_column
920 .clone()
921 .expect("time index should be set in `setup_context`"),
922 self.ctx.field_columns.first().cloned(),
923 normalize,
924 );
925 Ok(LogicalPlan::Extension(Extension {
926 node: Arc::new(manipulate),
927 }))
928 }
929
930 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
952 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
953 .alias(DEFAULT_FIELD_COLUMN);
954 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
955 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
956 project_exprs.push(self.create_time_index_column_expr()?);
957 project_exprs.push(time_expr);
958 project_exprs.extend(self.create_tag_column_exprs()?);
959
960 LogicalPlanBuilder::from(normalize)
961 .project(project_exprs)
962 .context(DataFusionPlanningSnafu)?
963 .build()
964 .context(DataFusionPlanningSnafu)
965 }
966
967 async fn prom_matrix_selector_to_plan(
968 &mut self,
969 matrix_selector: &MatrixSelector,
970 ) -> Result<LogicalPlan> {
971 let MatrixSelector { vs, range } = matrix_selector;
972 let VectorSelector {
973 name,
974 offset,
975 matchers,
976 ..
977 } = vs;
978 let matchers = self.preprocess_label_matchers(matchers, name)?;
979 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
980 let range_ms = range.as_millis() as _;
981 self.ctx.range = Some(range_ms);
982
983 let normalize = match self.setup_context().await? {
986 Some(empty_plan) => empty_plan,
987 None => {
988 self.selector_to_series_normalize_plan(offset, matchers, true)
989 .await?
990 }
991 };
992 let manipulate = RangeManipulate::new(
993 self.ctx.start,
994 self.ctx.end,
995 self.ctx.interval,
996 range_ms,
998 self.ctx
999 .time_index_column
1000 .clone()
1001 .expect("time index should be set in `setup_context`"),
1002 self.ctx.field_columns.clone(),
1003 normalize,
1004 )
1005 .context(DataFusionPlanningSnafu)?;
1006
1007 Ok(LogicalPlan::Extension(Extension {
1008 node: Arc::new(manipulate),
1009 }))
1010 }
1011
1012 async fn prom_call_expr_to_plan(
1013 &mut self,
1014 query_engine_state: &QueryEngineState,
1015 call_expr: &Call,
1016 ) -> Result<LogicalPlan> {
1017 let Call { func, args } = call_expr;
1018 match func.name {
1020 SPECIAL_HISTOGRAM_QUANTILE => {
1021 return self.create_histogram_plan(args, query_engine_state).await;
1022 }
1023 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1024 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1025 SPECIAL_ABSENT_FUNCTION => {
1026 return self.create_absent_plan(args, query_engine_state).await;
1027 }
1028 _ => {}
1029 }
1030
1031 let args = self.create_function_args(&args.args)?;
1033 let input = if let Some(prom_expr) = &args.input {
1034 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1035 .await?
1036 } else {
1037 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1038 self.ctx.reset_table_name_and_schema();
1039 self.ctx.tag_columns = vec![];
1040 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1041 LogicalPlan::Extension(Extension {
1042 node: Arc::new(
1043 EmptyMetric::new(
1044 self.ctx.start,
1045 self.ctx.end,
1046 self.ctx.interval,
1047 SPECIAL_TIME_FUNCTION.to_string(),
1048 DEFAULT_FIELD_COLUMN.to_string(),
1049 None,
1050 )
1051 .context(DataFusionPlanningSnafu)?,
1052 ),
1053 })
1054 };
1055 let (mut func_exprs, new_tags) =
1056 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1057 func_exprs.insert(0, self.create_time_index_column_expr()?);
1058 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1059
1060 let builder = LogicalPlanBuilder::from(input)
1061 .project(func_exprs)
1062 .context(DataFusionPlanningSnafu)?
1063 .filter(self.create_empty_values_filter_expr()?)
1064 .context(DataFusionPlanningSnafu)?;
1065
1066 let builder = match func.name {
1067 "sort" => builder
1068 .sort(self.create_field_columns_sort_exprs(true))
1069 .context(DataFusionPlanningSnafu)?,
1070 "sort_desc" => builder
1071 .sort(self.create_field_columns_sort_exprs(false))
1072 .context(DataFusionPlanningSnafu)?,
1073 "sort_by_label" => builder
1074 .sort(Self::create_sort_exprs_by_tags(
1075 func.name,
1076 args.literals,
1077 true,
1078 )?)
1079 .context(DataFusionPlanningSnafu)?,
1080 "sort_by_label_desc" => builder
1081 .sort(Self::create_sort_exprs_by_tags(
1082 func.name,
1083 args.literals,
1084 false,
1085 )?)
1086 .context(DataFusionPlanningSnafu)?,
1087
1088 _ => builder,
1089 };
1090
1091 for tag in new_tags {
1094 self.ctx.tag_columns.push(tag);
1095 }
1096
1097 let plan = builder.build().context(DataFusionPlanningSnafu)?;
1098 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1099
1100 Ok(plan)
1101 }
1102
1103 async fn prom_ext_expr_to_plan(
1104 &mut self,
1105 query_engine_state: &QueryEngineState,
1106 ext_expr: &promql_parser::parser::ast::Extension,
1107 ) -> Result<LogicalPlan> {
1108 let expr = &ext_expr.expr;
1110 let children = expr.children();
1111 let plan = self
1112 .prom_expr_to_plan(&children[0], query_engine_state)
1113 .await?;
1114 match expr.name() {
1120 ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1121 .explain(false, true)
1122 .unwrap()
1123 .build()
1124 .context(DataFusionPlanningSnafu),
1125 ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1126 .explain(true, true)
1127 .unwrap()
1128 .build()
1129 .context(DataFusionPlanningSnafu),
1130 EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1131 .explain(false, false)
1132 .unwrap()
1133 .build()
1134 .context(DataFusionPlanningSnafu),
1135 EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1136 .explain(true, false)
1137 .unwrap()
1138 .build()
1139 .context(DataFusionPlanningSnafu),
1140 ALIAS_NODE_NAME => {
1141 let alias = expr
1142 .as_any()
1143 .downcast_ref::<AliasExpr>()
1144 .context(UnexpectedPlanExprSnafu {
1145 desc: "Expected AliasExpr",
1146 })?
1147 .alias
1148 .clone();
1149 self.apply_alias(plan, alias)
1150 }
1151 _ => LogicalPlanBuilder::empty(true)
1152 .build()
1153 .context(DataFusionPlanningSnafu),
1154 }
1155 }
1156
1157 #[allow(clippy::mutable_key_type)]
1167 fn preprocess_label_matchers(
1168 &mut self,
1169 label_matchers: &Matchers,
1170 name: &Option<String>,
1171 ) -> Result<Matchers> {
1172 self.ctx.reset();
1173
1174 let metric_name;
1175 if let Some(name) = name.clone() {
1176 metric_name = Some(name);
1177 ensure!(
1178 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1179 MultipleMetricMatchersSnafu
1180 );
1181 } else {
1182 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1183 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1184 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1185 ensure!(
1186 matches[0].op == MatchOp::Equal,
1187 UnsupportedMatcherOpSnafu {
1188 matcher_op: matches[0].op.to_string(),
1189 matcher: METRIC_NAME
1190 }
1191 );
1192 metric_name = matches.pop().map(|m| m.value);
1193 }
1194
1195 self.ctx.table_name = metric_name;
1196
1197 let mut matchers = HashSet::new();
1198 for matcher in &label_matchers.matchers {
1199 if matcher.name == FIELD_COLUMN_MATCHER {
1201 self.ctx
1202 .field_column_matcher
1203 .get_or_insert_default()
1204 .push(matcher.clone());
1205 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1206 ensure!(
1207 matcher.op == MatchOp::Equal,
1208 UnsupportedMatcherOpSnafu {
1209 matcher: matcher.name.clone(),
1210 matcher_op: matcher.op.to_string(),
1211 }
1212 );
1213 self.ctx.schema_name = Some(matcher.value.clone());
1214 } else if matcher.name != METRIC_NAME {
1215 self.ctx.selector_matcher.push(matcher.clone());
1216 let _ = matchers.insert(matcher.clone());
1217 }
1218 }
1219
1220 Ok(Matchers::new(matchers.into_iter().collect()))
1221 }
1222
1223 async fn selector_to_series_normalize_plan(
1224 &mut self,
1225 offset: &Option<Offset>,
1226 label_matchers: Matchers,
1227 is_range_selector: bool,
1228 ) -> Result<LogicalPlan> {
1229 let table_ref = self.table_ref()?;
1231 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1232 let table_schema = table_scan.schema();
1233
1234 let offset_duration = match offset {
1236 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1237 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1238 None => 0,
1239 };
1240 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1241 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1242 scan_filters.push(time_index_filter);
1243 }
1244 table_scan = LogicalPlanBuilder::from(table_scan)
1245 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1247 .build()
1248 .context(DataFusionPlanningSnafu)?;
1249
1250 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1252 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1253 let mut result_set = HashSet::new();
1255 let mut reverse_set = HashSet::new();
1257 for matcher in field_matchers {
1258 match &matcher.op {
1259 MatchOp::Equal => {
1260 if col_set.contains(&matcher.value) {
1261 let _ = result_set.insert(matcher.value.clone());
1262 } else {
1263 return Err(ColumnNotFoundSnafu {
1264 col: matcher.value.clone(),
1265 }
1266 .build());
1267 }
1268 }
1269 MatchOp::NotEqual => {
1270 if col_set.contains(&matcher.value) {
1271 let _ = reverse_set.insert(matcher.value.clone());
1272 } else {
1273 return Err(ColumnNotFoundSnafu {
1274 col: matcher.value.clone(),
1275 }
1276 .build());
1277 }
1278 }
1279 MatchOp::Re(regex) => {
1280 for col in &self.ctx.field_columns {
1281 if regex.is_match(col) {
1282 let _ = result_set.insert(col.clone());
1283 }
1284 }
1285 }
1286 MatchOp::NotRe(regex) => {
1287 for col in &self.ctx.field_columns {
1288 if regex.is_match(col) {
1289 let _ = reverse_set.insert(col.clone());
1290 }
1291 }
1292 }
1293 }
1294 }
1295 if result_set.is_empty() {
1297 result_set = col_set.into_iter().cloned().collect();
1298 }
1299 for col in reverse_set {
1300 let _ = result_set.remove(&col);
1301 }
1302
1303 self.ctx.field_columns = self
1305 .ctx
1306 .field_columns
1307 .drain(..)
1308 .filter(|col| result_set.contains(col))
1309 .collect();
1310
1311 let exprs = result_set
1312 .into_iter()
1313 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1314 .chain(self.create_tag_column_exprs()?)
1315 .chain(
1316 self.ctx
1317 .use_tsid
1318 .then_some(DfExpr::Column(Column::new_unqualified(
1319 DATA_SCHEMA_TSID_COLUMN_NAME,
1320 ))),
1321 )
1322 .chain(Some(self.create_time_index_column_expr()?))
1323 .collect::<Vec<_>>();
1324
1325 table_scan = LogicalPlanBuilder::from(table_scan)
1327 .project(exprs)
1328 .context(DataFusionPlanningSnafu)?
1329 .build()
1330 .context(DataFusionPlanningSnafu)?;
1331 }
1332
1333 let series_key_columns = if self.ctx.use_tsid {
1335 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1336 } else {
1337 self.ctx.tag_columns.clone()
1338 };
1339
1340 let sort_exprs = if self.ctx.use_tsid {
1341 vec![
1342 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1343 self.create_time_index_column_expr()?.sort(true, true),
1344 ]
1345 } else {
1346 self.create_tag_and_time_index_column_sort_exprs()?
1347 };
1348
1349 let sort_plan = LogicalPlanBuilder::from(table_scan)
1350 .sort(sort_exprs)
1351 .context(DataFusionPlanningSnafu)?
1352 .build()
1353 .context(DataFusionPlanningSnafu)?;
1354
1355 let time_index_column =
1357 self.ctx
1358 .time_index_column
1359 .clone()
1360 .with_context(|| TimeIndexNotFoundSnafu {
1361 table: table_ref.to_string(),
1362 })?;
1363 let divide_plan = LogicalPlan::Extension(Extension {
1364 node: Arc::new(SeriesDivide::new(
1365 series_key_columns.clone(),
1366 time_index_column,
1367 sort_plan,
1368 )),
1369 });
1370
1371 if !is_range_selector && offset_duration == 0 {
1373 return Ok(divide_plan);
1374 }
1375 let series_normalize = SeriesNormalize::new(
1376 offset_duration,
1377 self.ctx
1378 .time_index_column
1379 .clone()
1380 .with_context(|| TimeIndexNotFoundSnafu {
1381 table: table_ref.to_quoted_string(),
1382 })?,
1383 is_range_selector,
1384 series_key_columns,
1385 divide_plan,
1386 );
1387 let logical_plan = LogicalPlan::Extension(Extension {
1388 node: Arc::new(series_normalize),
1389 });
1390
1391 Ok(logical_plan)
1392 }
1393
1394 fn agg_modifier_to_col(
1401 &mut self,
1402 input_schema: &DFSchemaRef,
1403 modifier: &Option<LabelModifier>,
1404 update_ctx: bool,
1405 ) -> Result<Vec<DfExpr>> {
1406 match modifier {
1407 None => {
1408 if update_ctx {
1409 self.ctx.tag_columns.clear();
1410 }
1411 Ok(vec![self.create_time_index_column_expr()?])
1412 }
1413 Some(LabelModifier::Include(labels)) => {
1414 if update_ctx {
1415 self.ctx.tag_columns.clear();
1416 }
1417 let mut exprs = Vec::with_capacity(labels.labels.len());
1418 for label in &labels.labels {
1419 if is_metric_engine_internal_column(label) {
1420 continue;
1421 }
1422 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1424 {
1425 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1426
1427 if update_ctx {
1428 self.ctx.tag_columns.push(column_name);
1430 }
1431 }
1432 }
1433 exprs.push(self.create_time_index_column_expr()?);
1435
1436 Ok(exprs)
1437 }
1438 Some(LabelModifier::Exclude(labels)) => {
1439 let mut all_fields = input_schema
1440 .fields()
1441 .iter()
1442 .map(|f| f.name())
1443 .collect::<BTreeSet<_>>();
1444
1445 all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1448
1449 for label in &labels.labels {
1452 let _ = all_fields.remove(label);
1453 }
1454
1455 if let Some(time_index) = &self.ctx.time_index_column {
1457 let _ = all_fields.remove(time_index);
1458 }
1459 for value in &self.ctx.field_columns {
1460 let _ = all_fields.remove(value);
1461 }
1462
1463 if update_ctx {
1464 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1466 }
1467
1468 let mut exprs = all_fields
1470 .into_iter()
1471 .map(|c| DfExpr::Column(Column::from(c)))
1472 .collect::<Vec<_>>();
1473
1474 exprs.push(self.create_time_index_column_expr()?);
1476
1477 Ok(exprs)
1478 }
1479 }
1480 }
1481
1482 pub fn matchers_to_expr(
1484 label_matchers: Matchers,
1485 table_schema: &DFSchemaRef,
1486 ) -> Result<Vec<DfExpr>> {
1487 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1488 for matcher in label_matchers.matchers {
1489 if matcher.name == SCHEMA_COLUMN_MATCHER
1490 || matcher.name == DB_COLUMN_MATCHER
1491 || matcher.name == FIELD_COLUMN_MATCHER
1492 {
1493 continue;
1494 }
1495
1496 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1497 let col = if let Some(column_name) = column_name {
1498 DfExpr::Column(Column::from_name(column_name))
1499 } else {
1500 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1501 .alias(matcher.name.clone())
1502 };
1503 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1504 let expr = match matcher.op {
1505 MatchOp::Equal => col.eq(lit),
1506 MatchOp::NotEqual => col.not_eq(lit),
1507 MatchOp::Re(re) => {
1508 if re.as_str() == "^(?:.*)$" {
1514 continue;
1515 }
1516 if re.as_str() == "^(?:.+)$" {
1517 col.not_eq(DfExpr::Literal(
1518 ScalarValue::Utf8(Some(String::new())),
1519 None,
1520 ))
1521 } else {
1522 DfExpr::BinaryExpr(BinaryExpr {
1523 left: Box::new(col),
1524 op: Operator::RegexMatch,
1525 right: Box::new(DfExpr::Literal(
1526 ScalarValue::Utf8(Some(re.as_str().to_string())),
1527 None,
1528 )),
1529 })
1530 }
1531 }
1532 MatchOp::NotRe(re) => {
1533 if re.as_str() == "^(?:.*)$" {
1534 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1535 } else if re.as_str() == "^(?:.+)$" {
1536 col.eq(DfExpr::Literal(
1537 ScalarValue::Utf8(Some(String::new())),
1538 None,
1539 ))
1540 } else {
1541 DfExpr::BinaryExpr(BinaryExpr {
1542 left: Box::new(col),
1543 op: Operator::RegexNotMatch,
1544 right: Box::new(DfExpr::Literal(
1545 ScalarValue::Utf8(Some(re.as_str().to_string())),
1546 None,
1547 )),
1548 })
1549 }
1550 }
1551 };
1552 exprs.push(expr);
1553 }
1554
1555 Ok(exprs)
1556 }
1557
1558 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1559 if is_metric_engine_internal_column(column) {
1560 return None;
1561 }
1562 schema
1563 .fields()
1564 .iter()
1565 .find(|field| field.name() == column)
1566 .map(|field| field.name().clone())
1567 }
1568
1569 fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1570 Ok(source
1571 .as_any()
1572 .downcast_ref::<DefaultTableSource>()
1573 .context(UnknownTableSnafu)?
1574 .table_provider
1575 .as_any()
1576 .downcast_ref::<DfTableProviderAdapter>()
1577 .context(UnknownTableSnafu)?
1578 .table())
1579 }
1580
1581 fn table_ref(&self) -> Result<TableReference> {
1582 let table_name = self
1583 .ctx
1584 .table_name
1585 .clone()
1586 .context(TableNameNotFoundSnafu)?;
1587
1588 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1590 TableReference::partial(schema_name.as_str(), table_name.as_str())
1591 } else {
1592 TableReference::bare(table_name.as_str())
1593 };
1594
1595 Ok(table_ref)
1596 }
1597
1598 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1599 let start = self.ctx.start;
1600 let end = self.ctx.end;
1601 if end < start {
1602 return InvalidTimeRangeSnafu { start, end }.fail();
1603 }
1604 let lookback_delta = self.ctx.lookback_delta;
1605 let range = self.ctx.range.unwrap_or_default();
1606 let interval = self.ctx.interval;
1607 let time_index_expr = self.create_time_index_column_expr()?;
1608 let num_points = (end - start) / interval;
1609
1610 let selector_window = if range == 0 { lookback_delta } else { range };
1618 let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1619
1620 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1622 let single_time_range = time_index_expr
1623 .clone()
1624 .gt_eq(DfExpr::Literal(
1625 ScalarValue::TimestampMillisecond(
1626 Some(
1627 self.ctx.start - offset_duration - selector_window
1628 + lower_exclusive_adjustment,
1629 ),
1630 None,
1631 ),
1632 None,
1633 ))
1634 .and(time_index_expr.lt_eq(DfExpr::Literal(
1635 ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1636 None,
1637 )));
1638 return Ok(Some(single_time_range));
1639 }
1640
1641 let mut filters = Vec::with_capacity(num_points as usize + 1);
1643 for timestamp in (start..=end).step_by(interval as usize) {
1644 filters.push(
1645 time_index_expr
1646 .clone()
1647 .gt_eq(DfExpr::Literal(
1648 ScalarValue::TimestampMillisecond(
1649 Some(
1650 timestamp - offset_duration - selector_window
1651 + lower_exclusive_adjustment,
1652 ),
1653 None,
1654 ),
1655 None,
1656 ))
1657 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1658 ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1659 None,
1660 ))),
1661 )
1662 }
1663
1664 Ok(filters.into_iter().reduce(DfExpr::or))
1665 }
1666
1667 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1672 let provider = self
1673 .table_provider
1674 .resolve_table(table_ref.clone())
1675 .await
1676 .context(CatalogSnafu)?;
1677
1678 let logical_table = self.table_from_source(&provider)?;
1679
1680 let mut maybe_phy_table_ref = table_ref.clone();
1682 let mut scan_provider = provider;
1683 let mut table_id_filter: Option<u32> = None;
1684
1685 if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1688 && let Some(physical_table_name) = logical_table
1689 .table_info()
1690 .meta
1691 .options
1692 .extra_options
1693 .get(LOGICAL_TABLE_METADATA_KEY)
1694 {
1695 let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1696 TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1697 } else {
1698 TableReference::bare(physical_table_name.as_str())
1699 };
1700
1701 let physical_provider = match self
1702 .table_provider
1703 .resolve_table(physical_table_ref.clone())
1704 .await
1705 {
1706 Ok(provider) => provider,
1707 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1708 scan_provider.clone()
1711 }
1712 Err(e) => return Err(e).context(CatalogSnafu),
1713 };
1714
1715 if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1716 let physical_table = self.table_from_source(&physical_provider)?;
1718
1719 let has_table_id = physical_table
1720 .schema()
1721 .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1722 .is_some();
1723 let has_tsid = physical_table
1724 .schema()
1725 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1726 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1727
1728 if has_table_id && has_tsid {
1729 scan_provider = physical_provider;
1730 maybe_phy_table_ref = physical_table_ref;
1731 table_id_filter = Some(logical_table.table_info().ident.table_id);
1732 }
1733 }
1734 }
1735
1736 let scan_table = self.table_from_source(&scan_provider)?;
1737
1738 let use_tsid = table_id_filter.is_some()
1739 && scan_table
1740 .schema()
1741 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1742 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1743 self.ctx.use_tsid = use_tsid;
1744
1745 let all_table_tags = self.ctx.tag_columns.clone();
1746
1747 let scan_tag_columns = if use_tsid {
1748 let mut scan_tags = self.ctx.tag_columns.clone();
1749 for matcher in &self.ctx.selector_matcher {
1750 if is_metric_engine_internal_column(&matcher.name) {
1751 continue;
1752 }
1753 if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1754 scan_tags.push(matcher.name.clone());
1755 }
1756 }
1757 scan_tags.sort_unstable();
1758 scan_tags.dedup();
1759 scan_tags
1760 } else {
1761 self.ctx.tag_columns.clone()
1762 };
1763
1764 let is_time_index_ms = scan_table
1765 .schema()
1766 .timestamp_column()
1767 .with_context(|| TimeIndexNotFoundSnafu {
1768 table: maybe_phy_table_ref.to_quoted_string(),
1769 })?
1770 .data_type
1771 == ConcreteDataType::timestamp_millisecond_datatype();
1772
1773 let scan_projection = if table_id_filter.is_some() {
1774 let mut required_columns = HashSet::new();
1775 required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1776 required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1777 TimeIndexNotFoundSnafu {
1778 table: maybe_phy_table_ref.to_quoted_string(),
1779 }
1780 })?);
1781 for col in &scan_tag_columns {
1782 required_columns.insert(col.clone());
1783 }
1784 for col in &self.ctx.field_columns {
1785 required_columns.insert(col.clone());
1786 }
1787 if use_tsid {
1788 required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1789 }
1790
1791 let arrow_schema = scan_table.schema().arrow_schema().clone();
1792 Some(
1793 arrow_schema
1794 .fields()
1795 .iter()
1796 .enumerate()
1797 .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1798 .map(|(idx, _)| idx)
1799 .collect::<Vec<_>>(),
1800 )
1801 } else {
1802 None
1803 };
1804
1805 let mut scan_plan =
1806 LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1807 .context(DataFusionPlanningSnafu)?
1808 .build()
1809 .context(DataFusionPlanningSnafu)?;
1810
1811 if let Some(table_id) = table_id_filter {
1812 scan_plan = LogicalPlanBuilder::from(scan_plan)
1813 .filter(
1814 DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1815 .eq(lit(table_id)),
1816 )
1817 .context(DataFusionPlanningSnafu)?
1818 .alias(table_ref.clone()) .context(DataFusionPlanningSnafu)?
1820 .build()
1821 .context(DataFusionPlanningSnafu)?;
1822 }
1823
1824 if !is_time_index_ms {
1825 let expr: Vec<_> = self
1827 .create_field_column_exprs()?
1828 .into_iter()
1829 .chain(
1830 scan_tag_columns
1831 .iter()
1832 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1833 )
1834 .chain(self.ctx.use_tsid.then_some(DfExpr::Column(Column::new(
1835 Some(table_ref.clone()),
1836 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1837 ))))
1838 .chain(Some(DfExpr::Alias(Alias {
1839 expr: Box::new(DfExpr::Cast(Cast {
1840 expr: Box::new(self.create_time_index_column_expr()?),
1841 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1842 })),
1843 relation: Some(table_ref.clone()),
1844 name: self
1845 .ctx
1846 .time_index_column
1847 .as_ref()
1848 .with_context(|| TimeIndexNotFoundSnafu {
1849 table: table_ref.to_quoted_string(),
1850 })?
1851 .clone(),
1852 metadata: None,
1853 })))
1854 .collect::<Vec<_>>();
1855 scan_plan = LogicalPlanBuilder::from(scan_plan)
1856 .project(expr)
1857 .context(DataFusionPlanningSnafu)?
1858 .build()
1859 .context(DataFusionPlanningSnafu)?;
1860 } else if table_id_filter.is_some() {
1861 let project_exprs = self
1863 .create_field_column_exprs()?
1864 .into_iter()
1865 .chain(
1866 scan_tag_columns
1867 .iter()
1868 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1869 )
1870 .chain(
1871 self.ctx
1872 .use_tsid
1873 .then_some(DfExpr::Column(Column::from_name(
1874 DATA_SCHEMA_TSID_COLUMN_NAME,
1875 ))),
1876 )
1877 .chain(Some(self.create_time_index_column_expr()?))
1878 .collect::<Vec<_>>();
1879
1880 scan_plan = LogicalPlanBuilder::from(scan_plan)
1881 .project(project_exprs)
1882 .context(DataFusionPlanningSnafu)?
1883 .build()
1884 .context(DataFusionPlanningSnafu)?;
1885 }
1886
1887 let result = LogicalPlanBuilder::from(scan_plan)
1888 .build()
1889 .context(DataFusionPlanningSnafu)?;
1890 Ok(result)
1891 }
1892
1893 fn collect_row_key_tag_columns_from_plan(
1894 &self,
1895 plan: &LogicalPlan,
1896 ) -> Result<BTreeSet<String>> {
1897 fn walk(
1898 planner: &PromPlanner,
1899 plan: &LogicalPlan,
1900 out: &mut BTreeSet<String>,
1901 ) -> Result<()> {
1902 if let LogicalPlan::TableScan(scan) = plan {
1903 let table = planner.table_from_source(&scan.source)?;
1904 for col in table.table_info().meta.row_key_column_names() {
1905 if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1906 && col != DATA_SCHEMA_TSID_COLUMN_NAME
1907 && !is_metric_engine_internal_column(col)
1908 {
1909 out.insert(col.clone());
1910 }
1911 }
1912 }
1913
1914 for input in plan.inputs() {
1915 walk(planner, input, out)?;
1916 }
1917 Ok(())
1918 }
1919
1920 let mut out = BTreeSet::new();
1921 walk(self, plan, &mut out)?;
1922 Ok(out)
1923 }
1924
1925 fn ensure_tag_columns_available(
1926 &self,
1927 plan: LogicalPlan,
1928 required_tags: &BTreeSet<String>,
1929 ) -> Result<LogicalPlan> {
1930 if required_tags.is_empty() {
1931 return Ok(plan);
1932 }
1933
1934 struct Rewriter {
1935 required_tags: BTreeSet<String>,
1936 }
1937
1938 impl TreeNodeRewriter for Rewriter {
1939 type Node = LogicalPlan;
1940
1941 fn f_up(
1942 &mut self,
1943 node: Self::Node,
1944 ) -> datafusion_common::Result<Transformed<Self::Node>> {
1945 match node {
1946 LogicalPlan::TableScan(scan) => {
1947 let schema = scan.source.schema();
1948 let mut projection = match scan.projection.clone() {
1949 Some(p) => p,
1950 None => {
1951 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1953 }
1954 };
1955
1956 let mut changed = false;
1957 for tag in &self.required_tags {
1958 if let Some((idx, _)) = schema
1959 .fields()
1960 .iter()
1961 .enumerate()
1962 .find(|(_, field)| field.name() == tag)
1963 && !projection.contains(&idx)
1964 {
1965 projection.push(idx);
1966 changed = true;
1967 }
1968 }
1969
1970 if !changed {
1971 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1972 }
1973
1974 projection.sort_unstable();
1975 projection.dedup();
1976
1977 let new_scan = TableScan::try_new(
1978 scan.table_name.clone(),
1979 scan.source.clone(),
1980 Some(projection),
1981 scan.filters,
1982 scan.fetch,
1983 )?;
1984 Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1985 }
1986 LogicalPlan::Projection(proj) => {
1987 let input_schema = proj.input.schema();
1988
1989 let existing = proj
1990 .schema
1991 .fields()
1992 .iter()
1993 .map(|f| f.name().as_str())
1994 .collect::<HashSet<_>>();
1995
1996 let mut expr = proj.expr.clone();
1997 let mut has_changed = false;
1998 for tag in &self.required_tags {
1999 if existing.contains(tag.as_str()) {
2000 continue;
2001 }
2002
2003 if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
2004 expr.push(DfExpr::Column(Column::from(
2005 input_schema.qualified_field(idx),
2006 )));
2007 has_changed = true;
2008 }
2009 }
2010
2011 if !has_changed {
2012 return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2013 }
2014
2015 let new_proj = Projection::try_new(expr, proj.input)?;
2016 Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2017 }
2018 other => Ok(Transformed::no(other)),
2019 }
2020 }
2021 }
2022
2023 let mut rewriter = Rewriter {
2024 required_tags: required_tags.clone(),
2025 };
2026 let rewritten = plan
2027 .rewrite(&mut rewriter)
2028 .context(DataFusionPlanningSnafu)?;
2029 Ok(rewritten.data)
2030 }
2031
2032 fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2033 let time_index = self.ctx.time_index_column.as_deref();
2034 let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2035
2036 let mut tags = schema
2037 .fields()
2038 .iter()
2039 .map(|f| f.name())
2040 .filter(|name| Some(name.as_str()) != time_index)
2041 .filter(|name| !field_columns.contains(name))
2042 .filter(|name| !is_metric_engine_internal_column(name))
2043 .cloned()
2044 .collect::<Vec<_>>();
2045 tags.sort_unstable();
2046 tags.dedup();
2047 self.ctx.tag_columns = tags;
2048 }
2049
2050 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2054 let table_ref = self.table_ref()?;
2055 let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2056 Err(e) if e.status_code() == StatusCode::TableNotFound => {
2057 let plan = self.setup_context_for_empty_metric()?;
2058 return Ok(Some(plan));
2059 }
2060 res => res.context(CatalogSnafu)?,
2061 };
2062 let table = self.table_from_source(&source)?;
2063
2064 let time_index = table
2066 .schema()
2067 .timestamp_column()
2068 .with_context(|| TimeIndexNotFoundSnafu {
2069 table: table_ref.to_quoted_string(),
2070 })?
2071 .name
2072 .clone();
2073 self.ctx.time_index_column = Some(time_index);
2074
2075 let values = table
2077 .table_info()
2078 .meta
2079 .field_column_names()
2080 .cloned()
2081 .collect();
2082 self.ctx.field_columns = values;
2083
2084 let tags = table
2086 .table_info()
2087 .meta
2088 .row_key_column_names()
2089 .filter(|col| {
2090 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2092 })
2093 .cloned()
2094 .collect();
2095 self.ctx.tag_columns = tags;
2096
2097 self.ctx.use_tsid = false;
2098
2099 Ok(None)
2100 }
2101
2102 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2105 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2106 self.ctx.reset_table_name_and_schema();
2107 self.ctx.tag_columns = vec![];
2108 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2109 self.ctx.use_tsid = false;
2110
2111 let plan = LogicalPlan::Extension(Extension {
2113 node: Arc::new(
2114 EmptyMetric::new(
2115 0,
2116 -1,
2117 self.ctx.interval,
2118 SPECIAL_TIME_FUNCTION.to_string(),
2119 DEFAULT_FIELD_COLUMN.to_string(),
2120 Some(lit(0.0f64)),
2121 )
2122 .context(DataFusionPlanningSnafu)?,
2123 ),
2124 });
2125 Ok(plan)
2126 }
2127
2128 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2130 let mut result = FunctionArgs::default();
2131
2132 for arg in args {
2133 if let Some(expr) = Self::try_build_literal_expr(arg) {
2135 result.literals.push(expr);
2136 } else {
2137 match arg.as_ref() {
2139 PromExpr::Subquery(_)
2140 | PromExpr::VectorSelector(_)
2141 | PromExpr::MatrixSelector(_)
2142 | PromExpr::Extension(_)
2143 | PromExpr::Aggregate(_)
2144 | PromExpr::Paren(_)
2145 | PromExpr::Call(_)
2146 | PromExpr::Binary(_)
2147 | PromExpr::Unary(_) => {
2148 if result.input.replace(*arg.clone()).is_some() {
2149 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2150 }
2151 }
2152
2153 _ => {
2154 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2155 result.literals.push(expr);
2156 }
2157 }
2158 }
2159 }
2160
2161 Ok(result)
2162 }
2163
2164 fn create_function_expr(
2170 &mut self,
2171 func: &Function,
2172 other_input_exprs: Vec<DfExpr>,
2173 query_engine_state: &QueryEngineState,
2174 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2175 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2177
2178 let field_column_pos = 0;
2180 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2181 let mut new_tags = vec![];
2183 let scalar_func = match func.name {
2184 "increase" => ScalarFunc::ExtrapolateUdf(
2185 Arc::new(Increase::scalar_udf()),
2186 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2187 ),
2188 "rate" => ScalarFunc::ExtrapolateUdf(
2189 Arc::new(Rate::scalar_udf()),
2190 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2191 ),
2192 "delta" => ScalarFunc::ExtrapolateUdf(
2193 Arc::new(Delta::scalar_udf()),
2194 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2195 ),
2196 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2197 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2198 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2199 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2200 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2201 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2202 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2203 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2204 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2205 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2206 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2207 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2208 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2209 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2210 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2211 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2212 "predict_linear" => {
2213 other_input_exprs[0] = DfExpr::Cast(Cast {
2214 expr: Box::new(other_input_exprs[0].clone()),
2215 data_type: ArrowDataType::Int64,
2216 });
2217 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2218 }
2219 "double_exponential_smoothing" | "holt_winters" => {
2220 ScalarFunc::Udf(Arc::new(DoubleExponentialSmoothing::scalar_udf()))
2221 }
2222 "time" => {
2223 exprs.push(build_special_time_expr(
2224 self.ctx.time_index_column.as_ref().unwrap(),
2225 ));
2226 ScalarFunc::GeneratedExpr
2227 }
2228 "minute" => {
2229 let expr = self.date_part_on_time_index("minute")?;
2231 exprs.push(expr);
2232 ScalarFunc::GeneratedExpr
2233 }
2234 "hour" => {
2235 let expr = self.date_part_on_time_index("hour")?;
2237 exprs.push(expr);
2238 ScalarFunc::GeneratedExpr
2239 }
2240 "month" => {
2241 let expr = self.date_part_on_time_index("month")?;
2243 exprs.push(expr);
2244 ScalarFunc::GeneratedExpr
2245 }
2246 "year" => {
2247 let expr = self.date_part_on_time_index("year")?;
2249 exprs.push(expr);
2250 ScalarFunc::GeneratedExpr
2251 }
2252 "day_of_month" => {
2253 let expr = self.date_part_on_time_index("day")?;
2255 exprs.push(expr);
2256 ScalarFunc::GeneratedExpr
2257 }
2258 "day_of_week" => {
2259 let expr = self.date_part_on_time_index("dow")?;
2261 exprs.push(expr);
2262 ScalarFunc::GeneratedExpr
2263 }
2264 "day_of_year" => {
2265 let expr = self.date_part_on_time_index("doy")?;
2267 exprs.push(expr);
2268 ScalarFunc::GeneratedExpr
2269 }
2270 "days_in_month" => {
2271 let day_lit_expr = "day".lit();
2276 let month_lit_expr = "month".lit();
2277 let interval_1month_lit_expr =
2278 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2279 let interval_1day_lit_expr = DfExpr::Literal(
2280 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2281 None,
2282 );
2283 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2284 left: Box::new(interval_1month_lit_expr),
2285 op: Operator::Minus,
2286 right: Box::new(interval_1day_lit_expr),
2287 });
2288 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2289 func: datafusion_functions::datetime::date_trunc(),
2290 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2291 });
2292 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2293 left: Box::new(date_trunc_expr),
2294 op: Operator::Plus,
2295 right: Box::new(the_1month_minus_1day_expr),
2296 });
2297 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2298 func: datafusion_functions::datetime::date_part(),
2299 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2300 });
2301
2302 exprs.push(date_part_expr);
2303 ScalarFunc::GeneratedExpr
2304 }
2305
2306 "label_join" => {
2307 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2308 &mut other_input_exprs,
2309 &self.ctx,
2310 query_engine_state,
2311 )?;
2312
2313 for value in &self.ctx.field_columns {
2315 if *value != dst_label {
2316 let expr = DfExpr::Column(Column::from_name(value));
2317 exprs.push(expr);
2318 }
2319 }
2320
2321 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2323 new_tags.push(dst_label);
2324 exprs.push(concat_expr);
2326
2327 ScalarFunc::GeneratedExpr
2328 }
2329 "label_replace" => {
2330 if let Some((replace_expr, dst_label)) = self
2331 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2332 {
2333 for value in &self.ctx.field_columns {
2335 if *value != dst_label {
2336 let expr = DfExpr::Column(Column::from_name(value));
2337 exprs.push(expr);
2338 }
2339 }
2340
2341 ensure!(
2342 !self.ctx.tag_columns.contains(&dst_label),
2343 SameLabelSetSnafu
2344 );
2345 new_tags.push(dst_label);
2346 exprs.push(replace_expr);
2348 } else {
2349 for value in &self.ctx.field_columns {
2351 let expr = DfExpr::Column(Column::from_name(value));
2352 exprs.push(expr);
2353 }
2354 }
2355
2356 ScalarFunc::GeneratedExpr
2357 }
2358 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2359 for value in &self.ctx.field_columns {
2362 let expr = DfExpr::Column(Column::from_name(value));
2363 exprs.push(expr);
2364 }
2365
2366 ScalarFunc::GeneratedExpr
2367 }
2368 "round" => {
2369 if other_input_exprs.is_empty() {
2370 other_input_exprs.push_front(0.0f64.lit());
2371 }
2372 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2373 }
2374 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2375 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2376 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2377 "pi" => {
2378 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2380 func: datafusion::functions::math::pi(),
2381 args: vec![],
2382 });
2383 exprs.push(fn_expr);
2384
2385 ScalarFunc::GeneratedExpr
2386 }
2387 _ => {
2388 if let Some(f) = query_engine_state
2389 .session_state()
2390 .scalar_functions()
2391 .get(func.name)
2392 {
2393 ScalarFunc::DataFusionBuiltin(f.clone())
2394 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2395 let func_state = query_engine_state.function_state();
2396 let query_ctx = self.table_provider.query_ctx();
2397
2398 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2399 state: func_state,
2400 query_ctx: query_ctx.clone(),
2401 })))
2402 } else if let Some(f) = datafusion_functions::math::functions()
2403 .iter()
2404 .find(|f| f.name() == func.name)
2405 {
2406 ScalarFunc::DataFusionUdf(f.clone())
2407 } else {
2408 return UnsupportedExprSnafu {
2409 name: func.name.to_string(),
2410 }
2411 .fail();
2412 }
2413 }
2414 };
2415
2416 for value in &self.ctx.field_columns {
2417 let col_expr = DfExpr::Column(Column::from_name(value));
2418
2419 match scalar_func.clone() {
2420 ScalarFunc::DataFusionBuiltin(func) => {
2421 other_input_exprs.insert(field_column_pos, col_expr);
2422 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2423 func,
2424 args: other_input_exprs.clone().into(),
2425 });
2426 exprs.push(fn_expr);
2427 let _ = other_input_exprs.remove(field_column_pos);
2428 }
2429 ScalarFunc::DataFusionUdf(func) => {
2430 let args = itertools::chain!(
2431 other_input_exprs.iter().take(field_column_pos).cloned(),
2432 std::iter::once(col_expr),
2433 other_input_exprs.iter().skip(field_column_pos).cloned()
2434 )
2435 .collect_vec();
2436 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2437 }
2438 ScalarFunc::Udf(func) => {
2439 let ts_range_expr = DfExpr::Column(Column::from_name(
2440 RangeManipulate::build_timestamp_range_name(
2441 self.ctx.time_index_column.as_ref().unwrap(),
2442 ),
2443 ));
2444 other_input_exprs.insert(field_column_pos, ts_range_expr);
2445 other_input_exprs.insert(field_column_pos + 1, col_expr);
2446 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2447 func,
2448 args: other_input_exprs.clone().into(),
2449 });
2450 exprs.push(fn_expr);
2451 let _ = other_input_exprs.remove(field_column_pos + 1);
2452 let _ = other_input_exprs.remove(field_column_pos);
2453 }
2454 ScalarFunc::ExtrapolateUdf(func, range_length) => {
2455 let ts_range_expr = DfExpr::Column(Column::from_name(
2456 RangeManipulate::build_timestamp_range_name(
2457 self.ctx.time_index_column.as_ref().unwrap(),
2458 ),
2459 ));
2460 other_input_exprs.insert(field_column_pos, ts_range_expr);
2461 other_input_exprs.insert(field_column_pos + 1, col_expr);
2462 other_input_exprs
2463 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2464 other_input_exprs.push_back(lit(range_length));
2465 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2466 func,
2467 args: other_input_exprs.clone().into(),
2468 });
2469 exprs.push(fn_expr);
2470 let _ = other_input_exprs.pop_back();
2471 let _ = other_input_exprs.remove(field_column_pos + 2);
2472 let _ = other_input_exprs.remove(field_column_pos + 1);
2473 let _ = other_input_exprs.remove(field_column_pos);
2474 }
2475 ScalarFunc::GeneratedExpr => {}
2476 }
2477 }
2478
2479 if !matches!(func.name, "label_join" | "label_replace") {
2483 let mut new_field_columns = Vec::with_capacity(exprs.len());
2484
2485 exprs = exprs
2486 .into_iter()
2487 .map(|expr| {
2488 let display_name = expr.schema_name().to_string();
2489 new_field_columns.push(display_name.clone());
2490 Ok(expr.alias(display_name))
2491 })
2492 .collect::<std::result::Result<Vec<_>, _>>()
2493 .context(DataFusionPlanningSnafu)?;
2494
2495 self.ctx.field_columns = new_field_columns;
2496 }
2497
2498 Ok((exprs, new_tags))
2499 }
2500
2501 fn validate_label_name(label_name: &str) -> Result<()> {
2505 if label_name.starts_with("__") {
2507 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2508 }
2509 if !LABEL_NAME_REGEX.is_match(label_name) {
2511 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2512 }
2513
2514 Ok(())
2515 }
2516
2517 fn build_regexp_replace_label_expr(
2519 &self,
2520 other_input_exprs: &mut VecDeque<DfExpr>,
2521 query_engine_state: &QueryEngineState,
2522 ) -> Result<Option<(DfExpr, String)>> {
2523 let dst_label = match other_input_exprs.pop_front() {
2525 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2526 other => UnexpectedPlanExprSnafu {
2527 desc: format!("expected dst_label string literal, but found {:?}", other),
2528 }
2529 .fail()?,
2530 };
2531
2532 Self::validate_label_name(&dst_label)?;
2534 let replacement = match other_input_exprs.pop_front() {
2535 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2536 other => UnexpectedPlanExprSnafu {
2537 desc: format!("expected replacement string literal, but found {:?}", other),
2538 }
2539 .fail()?,
2540 };
2541 let src_label = match other_input_exprs.pop_front() {
2542 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2543 other => UnexpectedPlanExprSnafu {
2544 desc: format!("expected src_label string literal, but found {:?}", other),
2545 }
2546 .fail()?,
2547 };
2548
2549 let regex = match other_input_exprs.pop_front() {
2550 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2551 other => UnexpectedPlanExprSnafu {
2552 desc: format!("expected regex string literal, but found {:?}", other),
2553 }
2554 .fail()?,
2555 };
2556
2557 regex::Regex::new(®ex).map_err(|_| {
2560 InvalidRegularExpressionSnafu {
2561 regex: regex.clone(),
2562 }
2563 .build()
2564 })?;
2565
2566 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2568 return Ok(None);
2569 }
2570
2571 if !self.ctx.tag_columns.contains(&src_label) {
2573 if replacement.is_empty() {
2574 return Ok(None);
2576 } else {
2577 return Ok(Some((
2579 lit(replacement).alias(&dst_label),
2581 dst_label,
2582 )));
2583 }
2584 }
2585
2586 let regex = format!("^(?s:{regex})$");
2589
2590 let session_state = query_engine_state.session_state();
2591 let func = session_state
2592 .scalar_functions()
2593 .get("regexp_replace")
2594 .context(UnsupportedExprSnafu {
2595 name: "regexp_replace",
2596 })?;
2597
2598 let args = vec![
2600 if src_label.is_empty() {
2601 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2602 } else {
2603 DfExpr::Column(Column::from_name(src_label))
2604 },
2605 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2606 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2607 ];
2608
2609 Ok(Some((
2610 DfExpr::ScalarFunction(ScalarFunction {
2611 func: func.clone(),
2612 args,
2613 })
2614 .alias(&dst_label),
2615 dst_label,
2616 )))
2617 }
2618
2619 fn build_concat_labels_expr(
2621 other_input_exprs: &mut VecDeque<DfExpr>,
2622 ctx: &PromPlannerContext,
2623 query_engine_state: &QueryEngineState,
2624 ) -> Result<(DfExpr, String)> {
2625 let dst_label = match other_input_exprs.pop_front() {
2628 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2629 other => UnexpectedPlanExprSnafu {
2630 desc: format!("expected dst_label string literal, but found {:?}", other),
2631 }
2632 .fail()?,
2633 };
2634 let separator = match other_input_exprs.pop_front() {
2635 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2636 other => UnexpectedPlanExprSnafu {
2637 desc: format!("expected separator string literal, but found {:?}", other),
2638 }
2639 .fail()?,
2640 };
2641
2642 let available_columns: HashSet<&str> = ctx
2644 .tag_columns
2645 .iter()
2646 .chain(ctx.field_columns.iter())
2647 .chain(ctx.time_index_column.as_ref())
2648 .map(|s| s.as_str())
2649 .collect();
2650
2651 let src_labels = other_input_exprs
2652 .iter()
2653 .map(|expr| {
2654 match expr {
2656 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2657 if label.is_empty() {
2658 Ok(DfExpr::Literal(ScalarValue::Null, None))
2659 } else if available_columns.contains(label.as_str()) {
2660 Ok(DfExpr::Column(Column::from_name(label)))
2662 } else {
2663 Ok(DfExpr::Literal(ScalarValue::Null, None))
2665 }
2666 }
2667 other => UnexpectedPlanExprSnafu {
2668 desc: format!(
2669 "expected source label string literal, but found {:?}",
2670 other
2671 ),
2672 }
2673 .fail(),
2674 }
2675 })
2676 .collect::<Result<Vec<_>>>()?;
2677 ensure!(
2678 !src_labels.is_empty(),
2679 FunctionInvalidArgumentSnafu {
2680 fn_name: "label_join"
2681 }
2682 );
2683
2684 let session_state = query_engine_state.session_state();
2685 let func = session_state
2686 .scalar_functions()
2687 .get("concat_ws")
2688 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2689
2690 let mut args = Vec::with_capacity(1 + src_labels.len());
2692 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2693 args.extend(src_labels);
2694
2695 Ok((
2696 DfExpr::ScalarFunction(ScalarFunction {
2697 func: func.clone(),
2698 args,
2699 })
2700 .alias(&dst_label),
2701 dst_label,
2702 ))
2703 }
2704
2705 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2706 Ok(DfExpr::Column(Column::from_name(
2707 self.ctx
2708 .time_index_column
2709 .clone()
2710 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2711 )))
2712 }
2713
2714 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2715 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2716 for tag in &self.ctx.tag_columns {
2717 let expr = DfExpr::Column(Column::from_name(tag));
2718 result.push(expr);
2719 }
2720 Ok(result)
2721 }
2722
2723 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2724 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2725 for field in &self.ctx.field_columns {
2726 let expr = DfExpr::Column(Column::from_name(field));
2727 result.push(expr);
2728 }
2729 Ok(result)
2730 }
2731
2732 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2733 let mut result = self
2734 .ctx
2735 .tag_columns
2736 .iter()
2737 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2738 .collect::<Vec<_>>();
2739 result.push(self.create_time_index_column_expr()?.sort(true, true));
2740 Ok(result)
2741 }
2742
2743 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2744 self.ctx
2745 .field_columns
2746 .iter()
2747 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2748 .collect::<Vec<_>>()
2749 }
2750
2751 fn create_sort_exprs_by_tags(
2752 func: &str,
2753 tags: Vec<DfExpr>,
2754 asc: bool,
2755 ) -> Result<Vec<SortExpr>> {
2756 ensure!(
2757 !tags.is_empty(),
2758 FunctionInvalidArgumentSnafu { fn_name: func }
2759 );
2760
2761 tags.iter()
2762 .map(|col| match col {
2763 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2764 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2765 }
2766 other => UnexpectedPlanExprSnafu {
2767 desc: format!("expected label string literal, but found {:?}", other),
2768 }
2769 .fail(),
2770 })
2771 .collect::<Result<Vec<_>>>()
2772 }
2773
2774 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2775 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2776 for value in &self.ctx.field_columns {
2777 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2778 exprs.push(expr);
2779 }
2780
2781 conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2786 table: self
2787 .table_ref()
2788 .map(|t| t.to_quoted_string())
2789 .unwrap_or_else(|_| "unknown".to_string()),
2790 })
2791 }
2792
2793 fn create_aggregate_exprs(
2809 &mut self,
2810 op: TokenType,
2811 param: &Option<Box<PromExpr>>,
2812 input_plan: &LogicalPlan,
2813 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2814 let mut non_col_args = Vec::new();
2815 let is_group_agg = op.id() == token::T_GROUP;
2816 if is_group_agg {
2817 ensure!(
2818 self.ctx.field_columns.len() == 1,
2819 MultiFieldsNotSupportedSnafu {
2820 operator: "group()"
2821 }
2822 );
2823 }
2824 let aggr = match op.id() {
2825 token::T_SUM => sum_udaf(),
2826 token::T_QUANTILE => {
2827 let q =
2828 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2829 non_col_args.push(q);
2830 quantile_udaf()
2831 }
2832 token::T_AVG => avg_udaf(),
2833 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2834 token::T_MIN => min_udaf(),
2835 token::T_MAX => max_udaf(),
2836 token::T_GROUP => max_udaf(),
2839 token::T_STDDEV => stddev_pop_udaf(),
2840 token::T_STDVAR => var_pop_udaf(),
2841 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2842 name: format!("{op:?}"),
2843 }
2844 .fail()?,
2845 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2846 };
2847
2848 let exprs: Vec<DfExpr> = self
2850 .ctx
2851 .field_columns
2852 .iter()
2853 .map(|col| {
2854 if is_group_agg {
2855 aggr.call(vec![lit(1_f64)])
2856 } else {
2857 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2858 let expr = aggr.call(non_col_args.clone());
2859 non_col_args.pop();
2860 expr
2861 }
2862 })
2863 .collect::<Vec<_>>();
2864
2865 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2867 let prev_field_exprs: Vec<_> = self
2868 .ctx
2869 .field_columns
2870 .iter()
2871 .map(|col| DfExpr::Column(Column::from_name(col)))
2872 .collect();
2873
2874 ensure!(
2875 self.ctx.field_columns.len() == 1,
2876 UnsupportedExprSnafu {
2877 name: "count_values on multi-value input"
2878 }
2879 );
2880
2881 prev_field_exprs
2882 } else {
2883 vec![]
2884 };
2885
2886 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2888
2889 let normalized_exprs =
2890 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2891 for expr in normalized_exprs {
2892 new_field_columns.push(expr.schema_name().to_string());
2893 }
2894 self.ctx.field_columns = new_field_columns;
2895
2896 Ok((exprs, prev_field_exprs))
2897 }
2898
2899 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2900 let param = param
2901 .as_deref()
2902 .with_context(|| FunctionInvalidArgumentSnafu {
2903 fn_name: op.to_string(),
2904 })?;
2905 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2906 return FunctionInvalidArgumentSnafu {
2907 fn_name: op.to_string(),
2908 }
2909 .fail();
2910 };
2911
2912 Ok(val)
2913 }
2914
2915 fn get_param_as_literal_expr(
2916 param: &Option<Box<PromExpr>>,
2917 op: Option<TokenType>,
2918 expected_type: Option<ArrowDataType>,
2919 ) -> Result<DfExpr> {
2920 let prom_param = param.as_deref().with_context(|| {
2921 if let Some(op) = op {
2922 FunctionInvalidArgumentSnafu {
2923 fn_name: op.to_string(),
2924 }
2925 } else {
2926 FunctionInvalidArgumentSnafu {
2927 fn_name: "unknown".to_string(),
2928 }
2929 }
2930 })?;
2931
2932 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2933 if let Some(op) = op {
2934 FunctionInvalidArgumentSnafu {
2935 fn_name: op.to_string(),
2936 }
2937 } else {
2938 FunctionInvalidArgumentSnafu {
2939 fn_name: "unknown".to_string(),
2940 }
2941 }
2942 })?;
2943
2944 if let Some(expected_type) = expected_type {
2946 let expr_type = expr
2948 .get_type(&DFSchema::empty())
2949 .context(DataFusionPlanningSnafu)?;
2950 if expected_type != expr_type {
2951 return FunctionInvalidArgumentSnafu {
2952 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2953 }
2954 .fail();
2955 }
2956 }
2957
2958 Ok(expr)
2959 }
2960
2961 fn create_window_exprs(
2964 &mut self,
2965 op: TokenType,
2966 group_exprs: Vec<DfExpr>,
2967 input_plan: &LogicalPlan,
2968 ) -> Result<Vec<DfExpr>> {
2969 ensure!(
2970 self.ctx.field_columns.len() == 1,
2971 UnsupportedExprSnafu {
2972 name: "topk or bottomk on multi-value input"
2973 }
2974 );
2975
2976 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2977
2978 let asc = matches!(op.id(), token::T_BOTTOMK);
2979
2980 let tag_sort_exprs = self
2981 .create_tag_column_exprs()?
2982 .into_iter()
2983 .map(|expr| expr.sort(asc, true));
2984
2985 let exprs: Vec<DfExpr> = self
2987 .ctx
2988 .field_columns
2989 .iter()
2990 .map(|col| {
2991 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2992 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2994 sort_exprs.extend(tag_sort_exprs.clone());
2997
2998 DfExpr::WindowFunction(Box::new(WindowFunction {
2999 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
3000 params: WindowFunctionParams {
3001 args: vec![],
3002 partition_by: group_exprs.clone(),
3003 order_by: sort_exprs,
3004 window_frame: WindowFrame::new(Some(true)),
3005 null_treatment: None,
3006 distinct: false,
3007 filter: None,
3008 },
3009 }))
3010 })
3011 .collect();
3012
3013 let normalized_exprs =
3014 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3015 Ok(normalized_exprs)
3016 }
3017
3018 #[deprecated(
3020 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3021 )]
3022 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3023 match expr {
3024 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3025 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3026 PromExpr::Unary(UnaryExpr { expr, .. }) => {
3027 Self::try_build_float_literal(expr).map(|f| -f)
3028 }
3029 PromExpr::StringLiteral(_)
3030 | PromExpr::Binary(_)
3031 | PromExpr::VectorSelector(_)
3032 | PromExpr::MatrixSelector(_)
3033 | PromExpr::Call(_)
3034 | PromExpr::Extension(_)
3035 | PromExpr::Aggregate(_)
3036 | PromExpr::Subquery(_) => None,
3037 }
3038 }
3039
3040 async fn create_histogram_plan(
3042 &mut self,
3043 args: &PromFunctionArgs,
3044 query_engine_state: &QueryEngineState,
3045 ) -> Result<LogicalPlan> {
3046 if args.args.len() != 2 {
3047 return FunctionInvalidArgumentSnafu {
3048 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3049 }
3050 .fail();
3051 }
3052 #[allow(deprecated)]
3053 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3054 FunctionInvalidArgumentSnafu {
3055 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3056 }
3057 })?;
3058
3059 let input = args.args[1].as_ref().clone();
3060 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3061 let input_plan = self.strip_tsid_column(input_plan)?;
3065 self.ctx.use_tsid = false;
3066
3067 if !self.ctx.has_le_tag() {
3068 return Ok(LogicalPlan::EmptyRelation(
3071 datafusion::logical_expr::EmptyRelation {
3072 produce_one_row: false,
3073 schema: Arc::new(DFSchema::empty()),
3074 },
3075 ));
3076 }
3077 let time_index_column =
3078 self.ctx
3079 .time_index_column
3080 .clone()
3081 .with_context(|| TimeIndexNotFoundSnafu {
3082 table: self.ctx.table_name.clone().unwrap_or_default(),
3083 })?;
3084 let field_column = self
3086 .ctx
3087 .field_columns
3088 .first()
3089 .with_context(|| FunctionInvalidArgumentSnafu {
3090 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3091 })?
3092 .clone();
3093 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3095
3096 Ok(LogicalPlan::Extension(Extension {
3097 node: Arc::new(
3098 HistogramFold::new(
3099 LE_COLUMN_NAME.to_string(),
3100 field_column,
3101 time_index_column,
3102 phi,
3103 input_plan,
3104 )
3105 .context(DataFusionPlanningSnafu)?,
3106 ),
3107 }))
3108 }
3109
3110 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3112 if args.args.len() != 1 {
3113 return FunctionInvalidArgumentSnafu {
3114 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3115 }
3116 .fail();
3117 }
3118 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3119
3120 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3122 self.ctx.reset_table_name_and_schema();
3123 self.ctx.tag_columns = vec![];
3124 self.ctx.field_columns = vec![greptime_value().to_string()];
3125 Ok(LogicalPlan::Extension(Extension {
3126 node: Arc::new(
3127 EmptyMetric::new(
3128 self.ctx.start,
3129 self.ctx.end,
3130 self.ctx.interval,
3131 SPECIAL_TIME_FUNCTION.to_string(),
3132 greptime_value().to_string(),
3133 Some(lit),
3134 )
3135 .context(DataFusionPlanningSnafu)?,
3136 ),
3137 }))
3138 }
3139
3140 async fn create_scalar_plan(
3142 &mut self,
3143 args: &PromFunctionArgs,
3144 query_engine_state: &QueryEngineState,
3145 ) -> Result<LogicalPlan> {
3146 ensure!(
3147 args.len() == 1,
3148 FunctionInvalidArgumentSnafu {
3149 fn_name: SCALAR_FUNCTION
3150 }
3151 );
3152 let input = self
3153 .prom_expr_to_plan(&args.args[0], query_engine_state)
3154 .await?;
3155 ensure!(
3156 self.ctx.field_columns.len() == 1,
3157 MultiFieldsNotSupportedSnafu {
3158 operator: SCALAR_FUNCTION
3159 },
3160 );
3161 let scalar_plan = LogicalPlan::Extension(Extension {
3162 node: Arc::new(
3163 ScalarCalculate::new(
3164 self.ctx.start,
3165 self.ctx.end,
3166 self.ctx.interval,
3167 input,
3168 self.ctx.time_index_column.as_ref().unwrap(),
3169 &self.ctx.tag_columns,
3170 &self.ctx.field_columns[0],
3171 self.ctx.table_name.as_deref(),
3172 )
3173 .context(PromqlPlanNodeSnafu)?,
3174 ),
3175 });
3176 self.ctx.tag_columns.clear();
3178 self.ctx.field_columns.clear();
3179 self.ctx
3180 .field_columns
3181 .push(scalar_plan.schema().field(1).name().clone());
3182 Ok(scalar_plan)
3183 }
3184
3185 async fn create_absent_plan(
3187 &mut self,
3188 args: &PromFunctionArgs,
3189 query_engine_state: &QueryEngineState,
3190 ) -> Result<LogicalPlan> {
3191 if args.args.len() != 1 {
3192 return FunctionInvalidArgumentSnafu {
3193 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3194 }
3195 .fail();
3196 }
3197 let input = self
3198 .prom_expr_to_plan(&args.args[0], query_engine_state)
3199 .await?;
3200
3201 let time_index_expr = self.create_time_index_column_expr()?;
3202 let first_field_expr =
3203 self.create_field_column_exprs()?
3204 .pop()
3205 .with_context(|| ValueNotFoundSnafu {
3206 table: self.ctx.table_name.clone().unwrap_or_default(),
3207 })?;
3208 let first_value_expr = first_value(first_field_expr, vec![]);
3209
3210 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3211 .aggregate(
3212 vec![time_index_expr.clone()],
3213 vec![first_value_expr.clone()],
3214 )
3215 .context(DataFusionPlanningSnafu)?
3216 .sort(vec![time_index_expr.sort(true, false)])
3217 .context(DataFusionPlanningSnafu)?
3218 .build()
3219 .context(DataFusionPlanningSnafu)?;
3220
3221 let fake_labels = self
3222 .ctx
3223 .selector_matcher
3224 .iter()
3225 .filter_map(|matcher| match matcher.op {
3226 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3227 _ => None,
3228 })
3229 .collect::<Vec<_>>();
3230
3231 let absent_plan = LogicalPlan::Extension(Extension {
3233 node: Arc::new(
3234 Absent::try_new(
3235 self.ctx.start,
3236 self.ctx.end,
3237 self.ctx.interval,
3238 self.ctx.time_index_column.as_ref().unwrap().clone(),
3239 self.ctx.field_columns[0].clone(),
3240 fake_labels,
3241 ordered_aggregated_input,
3242 )
3243 .context(DataFusionPlanningSnafu)?,
3244 ),
3245 });
3246
3247 Ok(absent_plan)
3248 }
3249
3250 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3253 match expr {
3254 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3255 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3256 PromExpr::VectorSelector(_)
3257 | PromExpr::MatrixSelector(_)
3258 | PromExpr::Extension(_)
3259 | PromExpr::Aggregate(_)
3260 | PromExpr::Subquery(_) => None,
3261 PromExpr::Call(Call { func, .. }) => {
3262 if func.name == SPECIAL_TIME_FUNCTION {
3263 None
3266 } else {
3267 None
3268 }
3269 }
3270 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3271 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3273 PromExpr::Binary(PromBinaryExpr {
3274 lhs,
3275 rhs,
3276 op,
3277 modifier,
3278 }) => {
3279 let lhs = Self::try_build_literal_expr(lhs)?;
3280 let rhs = Self::try_build_literal_expr(rhs)?;
3281 let is_comparison_op = Self::is_token_a_comparison_op(*op);
3282 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3283 let expr = expr_builder(lhs, rhs).ok()?;
3284
3285 let should_return_bool = if let Some(m) = modifier {
3286 m.return_bool
3287 } else {
3288 false
3289 };
3290 if is_comparison_op && should_return_bool {
3291 Some(DfExpr::Cast(Cast {
3292 expr: Box::new(expr),
3293 data_type: ArrowDataType::Float64,
3294 }))
3295 } else {
3296 Some(expr)
3297 }
3298 }
3299 }
3300 }
3301
3302 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3303 match expr {
3304 PromExpr::Call(Call { func, .. }) => {
3305 if func.name == SPECIAL_TIME_FUNCTION
3306 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3307 {
3308 Some(build_special_time_expr(time_index_col))
3309 } else {
3310 None
3311 }
3312 }
3313 _ => None,
3314 }
3315 }
3316
3317 #[allow(clippy::type_complexity)]
3320 fn prom_token_to_binary_expr_builder(
3321 token: TokenType,
3322 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3323 let cast_float = |expr| {
3324 if matches!(
3325 &expr,
3326 DfExpr::Cast(Cast {
3327 data_type: ArrowDataType::Float64,
3328 ..
3329 })
3330 ) || matches!(&expr, DfExpr::Literal(ScalarValue::Float64(_), _))
3331 {
3332 expr
3333 } else {
3334 DfExpr::Cast(Cast {
3335 expr: Box::new(expr),
3336 data_type: ArrowDataType::Float64,
3337 })
3338 }
3339 };
3340 match token.id() {
3341 token::T_ADD => Ok(Box::new(move |lhs, rhs| {
3342 Ok(cast_float(lhs) + cast_float(rhs))
3343 })),
3344 token::T_SUB => Ok(Box::new(move |lhs, rhs| {
3345 Ok(cast_float(lhs) - cast_float(rhs))
3346 })),
3347 token::T_MUL => Ok(Box::new(move |lhs, rhs| {
3348 Ok(cast_float(lhs) * cast_float(rhs))
3349 })),
3350 token::T_DIV => Ok(Box::new(move |lhs, rhs| {
3351 Ok(cast_float(lhs) / cast_float(rhs))
3352 })),
3353 token::T_MOD => Ok(Box::new(move |lhs: DfExpr, rhs| {
3354 Ok(cast_float(lhs) % cast_float(rhs))
3355 })),
3356 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3357 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3358 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3359 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3360 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3361 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3362 token::T_POW => Ok(Box::new(move |lhs, rhs| {
3363 Ok(DfExpr::ScalarFunction(ScalarFunction {
3364 func: datafusion_functions::math::power(),
3365 args: vec![cast_float(lhs), cast_float(rhs)],
3366 }))
3367 })),
3368 token::T_ATAN2 => Ok(Box::new(move |lhs, rhs| {
3369 Ok(DfExpr::ScalarFunction(ScalarFunction {
3370 func: datafusion_functions::math::atan2(),
3371 args: vec![cast_float(lhs), cast_float(rhs)],
3372 }))
3373 })),
3374 _ => UnexpectedTokenSnafu { token }.fail(),
3375 }
3376 }
3377
3378 fn is_token_a_comparison_op(token: TokenType) -> bool {
3380 matches!(
3381 token.id(),
3382 token::T_EQLC
3383 | token::T_NEQ
3384 | token::T_GTR
3385 | token::T_LSS
3386 | token::T_GTE
3387 | token::T_LTE
3388 )
3389 }
3390
3391 fn is_token_a_set_op(token: TokenType) -> bool {
3393 matches!(
3394 token.id(),
3395 token::T_LAND | token::T_LOR | token::T_LUNLESS )
3399 }
3400
3401 fn align_binary_field_columns<'a>(
3402 left_field_columns: &'a [String],
3403 right_field_columns: &'a [String],
3404 ) -> (Vec<String>, Vec<(&'a String, &'a String)>) {
3405 let field_pairs = left_field_columns
3406 .iter()
3407 .zip(right_field_columns.iter())
3408 .collect::<Vec<_>>();
3409 let output_field_columns = field_pairs
3410 .iter()
3411 .map(|(left_col_name, _)| (*left_col_name).clone())
3412 .collect();
3413 (output_field_columns, field_pairs)
3414 }
3415
3416 fn plan_has_tsid_column(plan: &LogicalPlan) -> bool {
3417 plan.schema()
3418 .fields()
3419 .iter()
3420 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3421 }
3422
3423 fn optional_tsid_projection(
3424 schema: &DFSchemaRef,
3425 table_ref: Option<&TableReference>,
3426 keep_tsid: bool,
3427 ) -> Option<DfExpr> {
3428 keep_tsid.then_some(()).and_then(|_| {
3429 schema
3430 .qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME)
3431 .ok()
3432 .map(|field| DfExpr::Column(field.into()))
3433 })
3434 }
3435
3436 fn binary_join_key_columns(
3437 &self,
3438 left: &LogicalPlan,
3439 right: &LogicalPlan,
3440 only_join_time_index: bool,
3441 modifier: &Option<BinModifier>,
3442 ) -> (BTreeSet<String>, BTreeSet<String>) {
3443 let use_tsid_join = !only_join_time_index
3444 && modifier.as_ref().is_none_or(|modifier| {
3445 modifier.matching.is_none()
3446 && matches!(modifier.card, VectorMatchCardinality::OneToOne)
3447 })
3448 && Self::plan_has_tsid_column(left)
3449 && Self::plan_has_tsid_column(right);
3450
3451 let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join {
3452 (
3453 BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
3454 BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
3455 )
3456 } else {
3457 let left_tag_columns = if only_join_time_index {
3458 BTreeSet::new()
3459 } else {
3460 self.ctx
3461 .tag_columns
3462 .iter()
3463 .cloned()
3464 .collect::<BTreeSet<_>>()
3465 };
3466 let right_tag_columns = left_tag_columns.clone();
3467 (left_tag_columns, right_tag_columns)
3468 };
3469
3470 if !use_tsid_join
3471 && let Some(modifier) = modifier
3472 && let Some(matching) = &modifier.matching
3473 {
3474 match matching {
3475 LabelModifier::Include(on) => {
3476 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3477 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3478 right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect();
3479 }
3480 LabelModifier::Exclude(ignoring) => {
3481 for label in &ignoring.labels {
3482 let _ = left_tag_columns.remove(label);
3483 let _ = right_tag_columns.remove(label);
3484 }
3485 }
3486 }
3487 }
3488
3489 (left_tag_columns, right_tag_columns)
3490 }
3491
3492 #[allow(clippy::too_many_arguments)]
3495 fn join_on_non_field_columns(
3496 &self,
3497 left: LogicalPlan,
3498 right: LogicalPlan,
3499 left_table_ref: TableReference,
3500 right_table_ref: TableReference,
3501 left_time_index_column: Option<String>,
3502 right_time_index_column: Option<String>,
3503 only_join_time_index: bool,
3504 modifier: &Option<BinModifier>,
3505 ) -> Result<LogicalPlan> {
3506 let (mut left_tag_columns, mut right_tag_columns) =
3507 self.binary_join_key_columns(&left, &right, only_join_time_index, modifier);
3508
3509 if let (Some(left_time_index_column), Some(right_time_index_column)) =
3511 (left_time_index_column, right_time_index_column)
3512 {
3513 left_tag_columns.insert(left_time_index_column);
3514 right_tag_columns.insert(right_time_index_column);
3515 }
3516
3517 let right = LogicalPlanBuilder::from(right)
3518 .alias(right_table_ref)
3519 .context(DataFusionPlanningSnafu)?
3520 .build()
3521 .context(DataFusionPlanningSnafu)?;
3522
3523 LogicalPlanBuilder::from(left)
3525 .alias(left_table_ref)
3526 .context(DataFusionPlanningSnafu)?
3527 .join_detailed(
3528 right,
3529 JoinType::Inner,
3530 (
3531 left_tag_columns
3532 .into_iter()
3533 .map(Column::from_name)
3534 .collect::<Vec<_>>(),
3535 right_tag_columns
3536 .into_iter()
3537 .map(Column::from_name)
3538 .collect::<Vec<_>>(),
3539 ),
3540 None,
3541 NullEquality::NullEqualsNull,
3542 )
3543 .context(DataFusionPlanningSnafu)?
3544 .build()
3545 .context(DataFusionPlanningSnafu)
3546 }
3547
3548 fn set_op_on_non_field_columns(
3550 &mut self,
3551 left: LogicalPlan,
3552 mut right: LogicalPlan,
3553 left_context: PromPlannerContext,
3554 right_context: PromPlannerContext,
3555 op: TokenType,
3556 modifier: &Option<BinModifier>,
3557 ) -> Result<LogicalPlan> {
3558 let mut left_tag_col_set = left_context
3559 .tag_columns
3560 .iter()
3561 .cloned()
3562 .collect::<HashSet<_>>();
3563 let mut right_tag_col_set = right_context
3564 .tag_columns
3565 .iter()
3566 .cloned()
3567 .collect::<HashSet<_>>();
3568
3569 if matches!(op.id(), token::T_LOR) {
3570 return self.or_operator(
3571 left,
3572 right,
3573 left_tag_col_set,
3574 right_tag_col_set,
3575 left_context,
3576 right_context,
3577 modifier,
3578 );
3579 }
3580
3581 if let Some(modifier) = modifier {
3583 ensure!(
3585 matches!(
3586 modifier.card,
3587 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3588 ),
3589 UnsupportedVectorMatchSnafu {
3590 name: modifier.card.clone(),
3591 },
3592 );
3593 if let Some(matching) = &modifier.matching {
3595 match matching {
3596 LabelModifier::Include(on) => {
3598 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3599 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3600 right_tag_col_set =
3601 right_tag_col_set.intersection(&mask).cloned().collect();
3602 }
3603 LabelModifier::Exclude(ignoring) => {
3605 for label in &ignoring.labels {
3607 let _ = left_tag_col_set.remove(label);
3608 let _ = right_tag_col_set.remove(label);
3609 }
3610 }
3611 }
3612 }
3613 }
3614 if !matches!(op.id(), token::T_LOR) {
3616 ensure!(
3617 left_tag_col_set == right_tag_col_set,
3618 CombineTableColumnMismatchSnafu {
3619 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3620 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3621 }
3622 )
3623 };
3624 let left_time_index = left_context.time_index_column.clone().unwrap();
3625 let right_time_index = right_context.time_index_column.clone().unwrap();
3626 let join_keys = left_tag_col_set
3627 .iter()
3628 .cloned()
3629 .chain([left_time_index.clone()])
3630 .collect::<Vec<_>>();
3631 self.ctx.time_index_column = Some(left_time_index.clone());
3632 self.ctx.use_tsid = left_context.use_tsid;
3633
3634 if left_context.time_index_column != right_context.time_index_column {
3636 let right_project_exprs = right
3637 .schema()
3638 .fields()
3639 .iter()
3640 .map(|field| {
3641 if field.name() == &right_time_index {
3642 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3643 } else {
3644 DfExpr::Column(Column::from_name(field.name()))
3645 }
3646 })
3647 .collect::<Vec<_>>();
3648
3649 right = LogicalPlanBuilder::from(right)
3650 .project(right_project_exprs)
3651 .context(DataFusionPlanningSnafu)?
3652 .build()
3653 .context(DataFusionPlanningSnafu)?;
3654 }
3655
3656 ensure!(
3657 left_context.field_columns.len() == 1,
3658 MultiFieldsNotSupportedSnafu {
3659 operator: "AND operator"
3660 }
3661 );
3662 let left_field_col = left_context.field_columns.first().unwrap();
3665 self.ctx.field_columns = vec![left_field_col.clone()];
3666
3667 match op.id() {
3670 token::T_LAND => LogicalPlanBuilder::from(left)
3671 .distinct()
3672 .context(DataFusionPlanningSnafu)?
3673 .join_detailed(
3674 right,
3675 JoinType::LeftSemi,
3676 (join_keys.clone(), join_keys),
3677 None,
3678 NullEquality::NullEqualsNull,
3679 )
3680 .context(DataFusionPlanningSnafu)?
3681 .build()
3682 .context(DataFusionPlanningSnafu),
3683 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3684 .distinct()
3685 .context(DataFusionPlanningSnafu)?
3686 .join_detailed(
3687 right,
3688 JoinType::LeftAnti,
3689 (join_keys.clone(), join_keys),
3690 None,
3691 NullEquality::NullEqualsNull,
3692 )
3693 .context(DataFusionPlanningSnafu)?
3694 .build()
3695 .context(DataFusionPlanningSnafu),
3696 token::T_LOR => {
3697 unreachable!()
3700 }
3701 _ => UnexpectedTokenSnafu { token: op }.fail(),
3702 }
3703 }
3704
3705 #[allow(clippy::too_many_arguments)]
3707 fn or_operator(
3708 &mut self,
3709 left: LogicalPlan,
3710 right: LogicalPlan,
3711 left_tag_cols_set: HashSet<String>,
3712 right_tag_cols_set: HashSet<String>,
3713 left_context: PromPlannerContext,
3714 right_context: PromPlannerContext,
3715 modifier: &Option<BinModifier>,
3716 ) -> Result<LogicalPlan> {
3717 ensure!(
3719 left_context.field_columns.len() == right_context.field_columns.len(),
3720 CombineTableColumnMismatchSnafu {
3721 left: left_context.field_columns.clone(),
3722 right: right_context.field_columns.clone()
3723 }
3724 );
3725 ensure!(
3726 left_context.field_columns.len() == 1,
3727 MultiFieldsNotSupportedSnafu {
3728 operator: "OR operator"
3729 }
3730 );
3731
3732 let all_tags = left_tag_cols_set
3734 .union(&right_tag_cols_set)
3735 .cloned()
3736 .collect::<HashSet<_>>();
3737 let tags_not_in_left = all_tags
3738 .difference(&left_tag_cols_set)
3739 .cloned()
3740 .collect::<Vec<_>>();
3741 let tags_not_in_right = all_tags
3742 .difference(&right_tag_cols_set)
3743 .cloned()
3744 .collect::<Vec<_>>();
3745 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3746 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3747 let left_qualifier_string = left_qualifier
3748 .as_ref()
3749 .map(|l| l.to_string())
3750 .unwrap_or_default();
3751 let right_qualifier_string = right_qualifier
3752 .as_ref()
3753 .map(|r| r.to_string())
3754 .unwrap_or_default();
3755 let left_time_index_column =
3756 left_context
3757 .time_index_column
3758 .clone()
3759 .with_context(|| TimeIndexNotFoundSnafu {
3760 table: left_qualifier_string.clone(),
3761 })?;
3762 let right_time_index_column =
3763 right_context
3764 .time_index_column
3765 .clone()
3766 .with_context(|| TimeIndexNotFoundSnafu {
3767 table: right_qualifier_string.clone(),
3768 })?;
3769 let left_field_col = left_context.field_columns.first().unwrap();
3771 let right_field_col = right_context.field_columns.first().unwrap();
3772 let left_has_tsid = left
3773 .schema()
3774 .fields()
3775 .iter()
3776 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3777 let right_has_tsid = right
3778 .schema()
3779 .fields()
3780 .iter()
3781 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3782
3783 let mut all_columns_set = left
3785 .schema()
3786 .fields()
3787 .iter()
3788 .chain(right.schema().fields().iter())
3789 .map(|field| field.name().clone())
3790 .collect::<HashSet<_>>();
3791 if !(left_has_tsid && right_has_tsid) {
3794 all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3795 }
3796 all_columns_set.remove(&left_time_index_column);
3798 all_columns_set.remove(&right_time_index_column);
3799 if left_field_col != right_field_col {
3801 all_columns_set.remove(right_field_col);
3802 }
3803 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3804 all_columns.sort_unstable();
3806 all_columns.insert(0, left_time_index_column.clone());
3808
3809 let left_proj_exprs = all_columns.iter().map(|col| {
3811 if tags_not_in_left.contains(col) {
3812 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3813 } else {
3814 DfExpr::Column(Column::new(None::<String>, col))
3815 }
3816 });
3817 let right_time_index_expr = DfExpr::Column(Column::new(
3818 right_qualifier.clone(),
3819 right_time_index_column,
3820 ))
3821 .alias(left_time_index_column.clone());
3822 let right_qualifier_for_field = right
3825 .schema()
3826 .iter()
3827 .find(|(_, f)| f.name() == right_field_col)
3828 .map(|(q, _)| q)
3829 .with_context(|| ColumnNotFoundSnafu {
3830 col: right_field_col.clone(),
3831 })?
3832 .cloned();
3833
3834 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3836 if col == left_field_col && left_field_col != right_field_col {
3838 DfExpr::Column(Column::new(
3840 right_qualifier_for_field.clone(),
3841 right_field_col,
3842 ))
3843 } else if tags_not_in_right.contains(col) {
3844 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3845 } else {
3846 DfExpr::Column(Column::new(None::<String>, col))
3847 }
3848 });
3849 let right_proj_exprs = [right_time_index_expr]
3850 .into_iter()
3851 .chain(right_proj_exprs_without_time_index);
3852
3853 let left_projected = LogicalPlanBuilder::from(left)
3854 .project(left_proj_exprs)
3855 .context(DataFusionPlanningSnafu)?
3856 .alias(left_qualifier_string.clone())
3857 .context(DataFusionPlanningSnafu)?
3858 .build()
3859 .context(DataFusionPlanningSnafu)?;
3860 let right_projected = LogicalPlanBuilder::from(right)
3861 .project(right_proj_exprs)
3862 .context(DataFusionPlanningSnafu)?
3863 .alias(right_qualifier_string.clone())
3864 .context(DataFusionPlanningSnafu)?
3865 .build()
3866 .context(DataFusionPlanningSnafu)?;
3867
3868 let mut match_columns = if let Some(modifier) = modifier
3870 && let Some(matching) = &modifier.matching
3871 {
3872 match matching {
3873 LabelModifier::Include(on) => on.labels.clone(),
3875 LabelModifier::Exclude(ignoring) => {
3877 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3878 all_tags.difference(&ignoring).cloned().collect()
3879 }
3880 }
3881 } else {
3882 all_tags.iter().cloned().collect()
3883 };
3884 match_columns.sort_unstable();
3886 let schema = left_projected.schema().clone();
3888 let union_distinct_on = UnionDistinctOn::new(
3889 left_projected,
3890 right_projected,
3891 match_columns,
3892 left_time_index_column.clone(),
3893 schema,
3894 );
3895 let result = LogicalPlan::Extension(Extension {
3896 node: Arc::new(union_distinct_on),
3897 });
3898
3899 self.ctx.time_index_column = Some(left_time_index_column);
3901 self.ctx.tag_columns = all_tags.into_iter().collect();
3902 self.ctx.field_columns = vec![left_field_col.clone()];
3903 self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3904
3905 Ok(result)
3906 }
3907
3908 fn projection_for_each_field_column<F>(
3916 &mut self,
3917 input: LogicalPlan,
3918 name_to_expr: F,
3919 ) -> Result<LogicalPlan>
3920 where
3921 F: FnMut(&String) -> Result<DfExpr>,
3922 {
3923 let table_ref = self.ctx.table_name.clone().map(TableReference::bare);
3924 let non_field_columns_iter = self
3925 .ctx
3926 .tag_columns
3927 .iter()
3928 .chain(self.ctx.time_index_column.iter())
3929 .map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col))));
3930 let tsid_iter =
3931 Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid)
3932 .into_iter()
3933 .map(Ok);
3934
3935 let result_field_columns = self
3937 .ctx
3938 .field_columns
3939 .iter()
3940 .map(name_to_expr)
3941 .collect::<Result<Vec<_>>>()?;
3942
3943 self.ctx.field_columns = result_field_columns
3945 .iter()
3946 .map(|expr| expr.schema_name().to_string())
3947 .collect();
3948 let field_columns_iter = result_field_columns
3949 .into_iter()
3950 .zip(self.ctx.field_columns.iter())
3951 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3952
3953 let project_fields = non_field_columns_iter
3955 .chain(tsid_iter)
3956 .chain(field_columns_iter)
3957 .collect::<Result<Vec<_>>>()?;
3958
3959 LogicalPlanBuilder::from(input)
3960 .project(project_fields)
3961 .context(DataFusionPlanningSnafu)?
3962 .build()
3963 .context(DataFusionPlanningSnafu)
3964 }
3965
3966 fn filter_on_field_column<F>(
3969 &self,
3970 input: LogicalPlan,
3971 mut name_to_expr: F,
3972 ) -> Result<LogicalPlan>
3973 where
3974 F: FnMut(&String) -> Result<DfExpr>,
3975 {
3976 ensure!(
3977 self.ctx.field_columns.len() == 1,
3978 UnsupportedExprSnafu {
3979 name: "filter on multi-value input"
3980 }
3981 );
3982
3983 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3984
3985 LogicalPlanBuilder::from(input)
3986 .filter(field_column_filter)
3987 .context(DataFusionPlanningSnafu)?
3988 .build()
3989 .context(DataFusionPlanningSnafu)
3990 }
3991
3992 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3995 let input_expr = datafusion::logical_expr::col(
3996 self.ctx
3997 .time_index_column
3998 .as_ref()
3999 .with_context(|| TimeIndexNotFoundSnafu {
4001 table: "<doesn't matter>",
4002 })?,
4003 );
4004 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
4005 func: datafusion_functions::datetime::date_part(),
4006 args: vec![date_part.lit(), input_expr],
4007 });
4008 Ok(fn_expr)
4009 }
4010
4011 fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
4012 let schema = plan.schema();
4013 if !schema
4014 .fields()
4015 .iter()
4016 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4017 {
4018 return Ok(plan);
4019 }
4020
4021 let project_exprs = schema
4022 .fields()
4023 .iter()
4024 .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
4025 .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
4026 .collect::<Result<Vec<_>>>()?;
4027
4028 LogicalPlanBuilder::from(plan)
4029 .project(project_exprs)
4030 .context(DataFusionPlanningSnafu)?
4031 .build()
4032 .context(DataFusionPlanningSnafu)
4033 }
4034
4035 fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
4037 let fields_expr = self.create_field_column_exprs()?;
4038
4039 ensure!(
4041 fields_expr.len() == 1,
4042 UnsupportedExprSnafu {
4043 name: "alias on multi-value result"
4044 }
4045 );
4046
4047 let project_fields = fields_expr
4048 .into_iter()
4049 .map(|expr| expr.alias(&alias_name))
4050 .chain(self.create_tag_column_exprs()?)
4051 .chain(Some(self.create_time_index_column_expr()?));
4052
4053 LogicalPlanBuilder::from(plan)
4054 .project(project_fields)
4055 .context(DataFusionPlanningSnafu)?
4056 .build()
4057 .context(DataFusionPlanningSnafu)
4058 }
4059}
4060
4061#[derive(Default, Debug)]
4062struct FunctionArgs {
4063 input: Option<PromExpr>,
4064 literals: Vec<DfExpr>,
4065}
4066
4067#[derive(Debug, Clone)]
4070enum ScalarFunc {
4071 DataFusionBuiltin(Arc<ScalarUdfDef>),
4075 DataFusionUdf(Arc<ScalarUdfDef>),
4079 Udf(Arc<ScalarUdfDef>),
4084 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4091 GeneratedExpr,
4095}
4096
4097#[cfg(test)]
4098mod test {
4099 use std::time::{Duration, UNIX_EPOCH};
4100
4101 use catalog::RegisterTableRequest;
4102 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4103 use common_base::Plugins;
4104 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4105 use common_query::prelude::greptime_timestamp;
4106 use common_query::test_util::DummyDecoder;
4107 use datatypes::prelude::ConcreteDataType;
4108 use datatypes::schema::{ColumnSchema, Schema};
4109 use promql_parser::label::Labels;
4110 use promql_parser::parser;
4111 use session::context::QueryContext;
4112 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4113 use table::test_util::EmptyTable;
4114
4115 use super::*;
4116 use crate::QueryEngineContext;
4117 use crate::options::QueryOptions;
4118 use crate::parser::QueryLanguageParser;
4119
4120 fn build_query_engine_state() -> QueryEngineState {
4121 QueryEngineState::new(
4122 new_memory_catalog_manager().unwrap(),
4123 None,
4124 None,
4125 None,
4126 None,
4127 None,
4128 false,
4129 Plugins::default(),
4130 QueryOptions::default(),
4131 )
4132 }
4133
4134 async fn build_optimized_promql_plan(
4135 table_provider: DfTableSourceProvider,
4136 eval_stmt: &EvalStmt,
4137 ) -> LogicalPlan {
4138 let state = build_query_engine_state();
4139 let raw_plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &state)
4140 .await
4141 .unwrap();
4142 let context = QueryEngineContext::new(state.session_state(), QueryContext::arc());
4143 state
4144 .optimize_by_extension_rules(raw_plan, &context)
4145 .unwrap()
4146 }
4147
4148 async fn build_optimized_tsid_plan(
4149 query: &str,
4150 num_tag: usize,
4151 num_field: usize,
4152 end_secs: u64,
4153 lookback_secs: u64,
4154 ) -> String {
4155 let eval_stmt = EvalStmt {
4156 expr: parser::parse(query).unwrap(),
4157 start: UNIX_EPOCH,
4158 end: UNIX_EPOCH
4159 .checked_add(Duration::from_secs(end_secs))
4160 .unwrap(),
4161 interval: Duration::from_secs(5),
4162 lookback_delta: Duration::from_secs(lookback_secs),
4163 };
4164 let table_provider = build_test_table_provider_with_tsid(
4165 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4166 num_tag,
4167 num_field,
4168 )
4169 .await;
4170
4171 build_optimized_promql_plan(table_provider, &eval_stmt)
4172 .await
4173 .display_indent_schema()
4174 .to_string()
4175 }
4176
4177 async fn assert_nested_count_rewrite_applies(query: &str, expected_outer_agg: &str) {
4178 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4179
4180 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4181 assert!(plan_str.contains("Projection: some_metric.timestamp, some_metric.tag_0"));
4182 assert!(plan_str.contains("Distinct:"));
4183 assert!(plan_str.contains(expected_outer_agg), "{plan_str}");
4184 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4185 }
4186
4187 async fn assert_nested_count_rewrite_missing(query: &str, num_tag: usize, lookback_secs: u64) {
4188 let plan_str = build_optimized_tsid_plan(query, num_tag, 1, 100_000, lookback_secs).await;
4189 assert!(!plan_str.contains("Distinct:"), "{plan_str}");
4190 }
4191
4192 fn build_eval_stmt(expr: &str) -> EvalStmt {
4193 EvalStmt {
4194 expr: parser::parse(expr).unwrap(),
4195 start: UNIX_EPOCH,
4196 end: UNIX_EPOCH
4197 .checked_add(Duration::from_secs(100_000))
4198 .unwrap(),
4199 interval: Duration::from_secs(5),
4200 lookback_delta: Duration::from_secs(1),
4201 }
4202 }
4203
4204 async fn build_test_table_provider(
4205 table_name_tuples: &[(String, String)],
4206 num_tag: usize,
4207 num_field: usize,
4208 ) -> DfTableSourceProvider {
4209 let catalog_list = MemoryCatalogManager::with_default_setup();
4210 for (schema_name, table_name) in table_name_tuples {
4211 let mut columns = vec![];
4212 for i in 0..num_tag {
4213 columns.push(ColumnSchema::new(
4214 format!("tag_{i}"),
4215 ConcreteDataType::string_datatype(),
4216 false,
4217 ));
4218 }
4219 columns.push(
4220 ColumnSchema::new(
4221 "timestamp".to_string(),
4222 ConcreteDataType::timestamp_millisecond_datatype(),
4223 false,
4224 )
4225 .with_time_index(true),
4226 );
4227 for i in 0..num_field {
4228 columns.push(ColumnSchema::new(
4229 format!("field_{i}"),
4230 ConcreteDataType::float64_datatype(),
4231 true,
4232 ));
4233 }
4234 let schema = Arc::new(Schema::new(columns));
4235 let table_meta = TableMetaBuilder::empty()
4236 .schema(schema)
4237 .primary_key_indices((0..num_tag).collect())
4238 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4239 .next_column_id(1024)
4240 .build()
4241 .unwrap();
4242 let table_info = TableInfoBuilder::default()
4243 .name(table_name.clone())
4244 .meta(table_meta)
4245 .build()
4246 .unwrap();
4247 let table = EmptyTable::from_table_info(&table_info);
4248
4249 assert!(
4250 catalog_list
4251 .register_table_sync(RegisterTableRequest {
4252 catalog: DEFAULT_CATALOG_NAME.to_string(),
4253 schema: schema_name.clone(),
4254 table_name: table_name.clone(),
4255 table_id: 1024,
4256 table,
4257 })
4258 .is_ok()
4259 );
4260 }
4261
4262 DfTableSourceProvider::new(
4263 catalog_list,
4264 false,
4265 QueryContext::arc(),
4266 DummyDecoder::arc(),
4267 false,
4268 )
4269 }
4270
4271 async fn build_test_table_provider_with_tsid(
4272 table_name_tuples: &[(String, String)],
4273 num_tag: usize,
4274 num_field: usize,
4275 ) -> DfTableSourceProvider {
4276 let table_specs = table_name_tuples
4277 .iter()
4278 .map(|(schema_name, table_name)| ((schema_name.clone(), table_name.clone()), num_field))
4279 .collect::<Vec<_>>();
4280 build_test_table_provider_with_tsid_fields(&table_specs, num_tag).await
4281 }
4282
4283 async fn build_test_table_provider_with_tsid_fields(
4284 table_specs: &[((String, String), usize)],
4285 num_tag: usize,
4286 ) -> DfTableSourceProvider {
4287 let catalog_list = MemoryCatalogManager::with_default_setup();
4288
4289 let physical_table_name = "phy";
4290 let physical_table_id = 999u32;
4291 let physical_num_field = table_specs
4292 .iter()
4293 .map(|(_, num_field)| *num_field)
4294 .max()
4295 .unwrap_or(0);
4296
4297 {
4299 let mut columns = vec![
4300 ColumnSchema::new(
4301 DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4302 ConcreteDataType::uint32_datatype(),
4303 false,
4304 ),
4305 ColumnSchema::new(
4306 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4307 ConcreteDataType::uint64_datatype(),
4308 false,
4309 ),
4310 ];
4311 for i in 0..num_tag {
4312 columns.push(ColumnSchema::new(
4313 format!("tag_{i}"),
4314 ConcreteDataType::string_datatype(),
4315 false,
4316 ));
4317 }
4318 columns.push(
4319 ColumnSchema::new(
4320 "timestamp".to_string(),
4321 ConcreteDataType::timestamp_millisecond_datatype(),
4322 false,
4323 )
4324 .with_time_index(true),
4325 );
4326 for i in 0..physical_num_field {
4327 columns.push(ColumnSchema::new(
4328 format!("field_{i}"),
4329 ConcreteDataType::float64_datatype(),
4330 true,
4331 ));
4332 }
4333
4334 let schema = Arc::new(Schema::new(columns));
4335 let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4336 let table_meta = TableMetaBuilder::empty()
4337 .schema(schema)
4338 .primary_key_indices(primary_key_indices)
4339 .value_indices((2 + num_tag..2 + num_tag + 1 + physical_num_field).collect())
4340 .engine(METRIC_ENGINE_NAME.to_string())
4341 .next_column_id(1024)
4342 .build()
4343 .unwrap();
4344 let table_info = TableInfoBuilder::default()
4345 .table_id(physical_table_id)
4346 .name(physical_table_name)
4347 .meta(table_meta)
4348 .build()
4349 .unwrap();
4350 let table = EmptyTable::from_table_info(&table_info);
4351
4352 assert!(
4353 catalog_list
4354 .register_table_sync(RegisterTableRequest {
4355 catalog: DEFAULT_CATALOG_NAME.to_string(),
4356 schema: DEFAULT_SCHEMA_NAME.to_string(),
4357 table_name: physical_table_name.to_string(),
4358 table_id: physical_table_id,
4359 table,
4360 })
4361 .is_ok()
4362 );
4363 }
4364
4365 for (idx, ((schema_name, table_name), num_field)) in table_specs.iter().enumerate() {
4367 let mut columns = vec![];
4368 for i in 0..num_tag {
4369 columns.push(ColumnSchema::new(
4370 format!("tag_{i}"),
4371 ConcreteDataType::string_datatype(),
4372 false,
4373 ));
4374 }
4375 columns.push(
4376 ColumnSchema::new(
4377 "timestamp".to_string(),
4378 ConcreteDataType::timestamp_millisecond_datatype(),
4379 false,
4380 )
4381 .with_time_index(true),
4382 );
4383 for i in 0..*num_field {
4384 columns.push(ColumnSchema::new(
4385 format!("field_{i}"),
4386 ConcreteDataType::float64_datatype(),
4387 true,
4388 ));
4389 }
4390
4391 let schema = Arc::new(Schema::new(columns));
4392 let mut options = table::requests::TableOptions::default();
4393 options.extra_options.insert(
4394 LOGICAL_TABLE_METADATA_KEY.to_string(),
4395 physical_table_name.to_string(),
4396 );
4397 let table_id = 1024u32 + idx as u32;
4398 let table_meta = TableMetaBuilder::empty()
4399 .schema(schema)
4400 .primary_key_indices((0..num_tag).collect())
4401 .value_indices((num_tag + 1..num_tag + 1 + *num_field).collect())
4402 .engine(METRIC_ENGINE_NAME.to_string())
4403 .options(options)
4404 .next_column_id(1024)
4405 .build()
4406 .unwrap();
4407 let table_info = TableInfoBuilder::default()
4408 .table_id(table_id)
4409 .name(table_name.clone())
4410 .meta(table_meta)
4411 .build()
4412 .unwrap();
4413 let table = EmptyTable::from_table_info(&table_info);
4414
4415 assert!(
4416 catalog_list
4417 .register_table_sync(RegisterTableRequest {
4418 catalog: DEFAULT_CATALOG_NAME.to_string(),
4419 schema: schema_name.clone(),
4420 table_name: table_name.clone(),
4421 table_id,
4422 table,
4423 })
4424 .is_ok()
4425 );
4426 }
4427
4428 DfTableSourceProvider::new(
4429 catalog_list,
4430 false,
4431 QueryContext::arc(),
4432 DummyDecoder::arc(),
4433 false,
4434 )
4435 }
4436
4437 async fn build_test_table_provider_with_fields(
4438 table_name_tuples: &[(String, String)],
4439 tags: &[&str],
4440 ) -> DfTableSourceProvider {
4441 let catalog_list = MemoryCatalogManager::with_default_setup();
4442 for (schema_name, table_name) in table_name_tuples {
4443 let mut columns = vec![];
4444 let num_tag = tags.len();
4445 for tag in tags {
4446 columns.push(ColumnSchema::new(
4447 tag.to_string(),
4448 ConcreteDataType::string_datatype(),
4449 false,
4450 ));
4451 }
4452 columns.push(
4453 ColumnSchema::new(
4454 greptime_timestamp().to_string(),
4455 ConcreteDataType::timestamp_millisecond_datatype(),
4456 false,
4457 )
4458 .with_time_index(true),
4459 );
4460 columns.push(ColumnSchema::new(
4461 greptime_value().to_string(),
4462 ConcreteDataType::float64_datatype(),
4463 true,
4464 ));
4465 let schema = Arc::new(Schema::new(columns));
4466 let table_meta = TableMetaBuilder::empty()
4467 .schema(schema)
4468 .primary_key_indices((0..num_tag).collect())
4469 .next_column_id(1024)
4470 .build()
4471 .unwrap();
4472 let table_info = TableInfoBuilder::default()
4473 .name(table_name.clone())
4474 .meta(table_meta)
4475 .build()
4476 .unwrap();
4477 let table = EmptyTable::from_table_info(&table_info);
4478
4479 assert!(
4480 catalog_list
4481 .register_table_sync(RegisterTableRequest {
4482 catalog: DEFAULT_CATALOG_NAME.to_string(),
4483 schema: schema_name.clone(),
4484 table_name: table_name.clone(),
4485 table_id: 1024,
4486 table,
4487 })
4488 .is_ok()
4489 );
4490 }
4491
4492 DfTableSourceProvider::new(
4493 catalog_list,
4494 false,
4495 QueryContext::arc(),
4496 DummyDecoder::arc(),
4497 false,
4498 )
4499 }
4500
4501 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4517 let prom_expr =
4518 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4519 let eval_stmt = EvalStmt {
4520 expr: prom_expr,
4521 start: UNIX_EPOCH,
4522 end: UNIX_EPOCH
4523 .checked_add(Duration::from_secs(100_000))
4524 .unwrap(),
4525 interval: Duration::from_secs(5),
4526 lookback_delta: Duration::from_secs(1),
4527 };
4528
4529 let table_provider = build_test_table_provider(
4530 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4531 1,
4532 1,
4533 )
4534 .await;
4535 let plan =
4536 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4537 .await
4538 .unwrap();
4539
4540 let expected = String::from(
4541 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4542 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4543 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4544 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4545 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4546 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4547 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"
4548 ).replace("TEMPLATE", plan_name);
4549
4550 assert_eq!(plan.display_indent_schema().to_string(), expected);
4551 }
4552
4553 #[tokio::test]
4554 async fn single_abs() {
4555 do_single_instant_function_call("abs", "abs").await;
4556 }
4557
4558 #[tokio::test]
4559 #[should_panic]
4560 async fn single_absent() {
4561 do_single_instant_function_call("absent", "").await;
4562 }
4563
4564 #[tokio::test]
4565 async fn single_ceil() {
4566 do_single_instant_function_call("ceil", "ceil").await;
4567 }
4568
4569 #[tokio::test]
4570 async fn single_exp() {
4571 do_single_instant_function_call("exp", "exp").await;
4572 }
4573
4574 #[tokio::test]
4575 async fn single_ln() {
4576 do_single_instant_function_call("ln", "ln").await;
4577 }
4578
4579 #[tokio::test]
4580 async fn single_log2() {
4581 do_single_instant_function_call("log2", "log2").await;
4582 }
4583
4584 #[tokio::test]
4585 async fn single_log10() {
4586 do_single_instant_function_call("log10", "log10").await;
4587 }
4588
4589 #[tokio::test]
4590 #[should_panic]
4591 async fn single_scalar() {
4592 do_single_instant_function_call("scalar", "").await;
4593 }
4594
4595 #[tokio::test]
4596 #[should_panic]
4597 async fn single_sgn() {
4598 do_single_instant_function_call("sgn", "").await;
4599 }
4600
4601 #[tokio::test]
4602 #[should_panic]
4603 async fn single_sort() {
4604 do_single_instant_function_call("sort", "").await;
4605 }
4606
4607 #[tokio::test]
4608 #[should_panic]
4609 async fn single_sort_desc() {
4610 do_single_instant_function_call("sort_desc", "").await;
4611 }
4612
4613 #[tokio::test]
4614 async fn single_sqrt() {
4615 do_single_instant_function_call("sqrt", "sqrt").await;
4616 }
4617
4618 #[tokio::test]
4619 #[should_panic]
4620 async fn single_timestamp() {
4621 do_single_instant_function_call("timestamp", "").await;
4622 }
4623
4624 #[tokio::test]
4625 async fn single_acos() {
4626 do_single_instant_function_call("acos", "acos").await;
4627 }
4628
4629 #[tokio::test]
4630 #[should_panic]
4631 async fn single_acosh() {
4632 do_single_instant_function_call("acosh", "").await;
4633 }
4634
4635 #[tokio::test]
4636 async fn single_asin() {
4637 do_single_instant_function_call("asin", "asin").await;
4638 }
4639
4640 #[tokio::test]
4641 #[should_panic]
4642 async fn single_asinh() {
4643 do_single_instant_function_call("asinh", "").await;
4644 }
4645
4646 #[tokio::test]
4647 async fn single_atan() {
4648 do_single_instant_function_call("atan", "atan").await;
4649 }
4650
4651 #[tokio::test]
4652 #[should_panic]
4653 async fn single_atanh() {
4654 do_single_instant_function_call("atanh", "").await;
4655 }
4656
4657 #[tokio::test]
4658 async fn single_cos() {
4659 do_single_instant_function_call("cos", "cos").await;
4660 }
4661
4662 #[tokio::test]
4663 #[should_panic]
4664 async fn single_cosh() {
4665 do_single_instant_function_call("cosh", "").await;
4666 }
4667
4668 #[tokio::test]
4669 async fn single_sin() {
4670 do_single_instant_function_call("sin", "sin").await;
4671 }
4672
4673 #[tokio::test]
4674 #[should_panic]
4675 async fn single_sinh() {
4676 do_single_instant_function_call("sinh", "").await;
4677 }
4678
4679 #[tokio::test]
4680 async fn single_tan() {
4681 do_single_instant_function_call("tan", "tan").await;
4682 }
4683
4684 #[tokio::test]
4685 #[should_panic]
4686 async fn single_tanh() {
4687 do_single_instant_function_call("tanh", "").await;
4688 }
4689
4690 #[tokio::test]
4691 #[should_panic]
4692 async fn single_deg() {
4693 do_single_instant_function_call("deg", "").await;
4694 }
4695
4696 #[tokio::test]
4697 #[should_panic]
4698 async fn single_rad() {
4699 do_single_instant_function_call("rad", "").await;
4700 }
4701
4702 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4724 let prom_expr = parser::parse(&format!(
4725 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4726 ))
4727 .unwrap();
4728 let mut eval_stmt = EvalStmt {
4729 expr: prom_expr,
4730 start: UNIX_EPOCH,
4731 end: UNIX_EPOCH
4732 .checked_add(Duration::from_secs(100_000))
4733 .unwrap(),
4734 interval: Duration::from_secs(5),
4735 lookback_delta: Duration::from_secs(1),
4736 };
4737
4738 let table_provider = build_test_table_provider(
4740 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4741 2,
4742 2,
4743 )
4744 .await;
4745 let plan =
4746 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4747 .await
4748 .unwrap();
4749 let expected_no_without = String::from(
4750 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4751 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4752 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4753 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4754 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4755 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4756 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4757 ).replace("TEMPLATE", plan_name);
4758 assert_eq!(
4759 plan.display_indent_schema().to_string(),
4760 expected_no_without
4761 );
4762
4763 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4765 *modifier = Some(LabelModifier::Exclude(Labels {
4766 labels: vec![String::from("tag_1")].into_iter().collect(),
4767 }));
4768 }
4769 let table_provider = build_test_table_provider(
4770 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4771 2,
4772 2,
4773 )
4774 .await;
4775 let plan =
4776 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4777 .await
4778 .unwrap();
4779 let expected_without = String::from(
4780 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4781 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4782 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4783 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4784 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4785 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4786 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4787 ).replace("TEMPLATE", plan_name);
4788 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4789 }
4790
4791 #[tokio::test]
4792 async fn aggregate_sum() {
4793 do_aggregate_expr_plan("sum", "sum").await;
4794 }
4795
4796 #[tokio::test]
4797 async fn tsid_is_used_for_series_divide_when_available() {
4798 let prom_expr = parser::parse("some_metric").unwrap();
4799 let eval_stmt = EvalStmt {
4800 expr: prom_expr,
4801 start: UNIX_EPOCH,
4802 end: UNIX_EPOCH
4803 .checked_add(Duration::from_secs(100_000))
4804 .unwrap(),
4805 interval: Duration::from_secs(5),
4806 lookback_delta: Duration::from_secs(1),
4807 };
4808
4809 let table_provider = build_test_table_provider_with_tsid(
4810 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4811 1,
4812 1,
4813 )
4814 .await;
4815 let plan =
4816 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4817 .await
4818 .unwrap();
4819
4820 let plan_str = plan.display_indent_schema().to_string();
4821 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4822 assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4823 assert!(
4824 !plan
4825 .schema()
4826 .fields()
4827 .iter()
4828 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4829 );
4830 }
4831
4832 #[tokio::test]
4833 async fn default_binary_join_uses_tsid_when_available() {
4834 let eval_stmt = build_eval_stmt("some_metric / some_alt_metric");
4835
4836 let table_provider = build_test_table_provider_with_tsid(
4837 &[
4838 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4839 (
4840 DEFAULT_SCHEMA_NAME.to_string(),
4841 "some_alt_metric".to_string(),
4842 ),
4843 ],
4844 1,
4845 1,
4846 )
4847 .await;
4848 let plan =
4849 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4850 .await
4851 .unwrap();
4852
4853 let plan_str = plan.display_indent_schema().to_string();
4854 assert!(
4855 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
4856 "{plan_str}"
4857 );
4858 assert!(
4859 !plan_str.contains("some_metric.tag_0 = some_alt_metric.tag_0"),
4860 "{plan_str}"
4861 );
4862 }
4863
4864 #[tokio::test]
4865 async fn tsid_is_preserved_for_nested_default_binary_joins() {
4866 let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric");
4867
4868 let table_provider = build_test_table_provider_with_tsid(
4869 &[
4870 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4871 (
4872 DEFAULT_SCHEMA_NAME.to_string(),
4873 "some_alt_metric".to_string(),
4874 ),
4875 (
4876 DEFAULT_SCHEMA_NAME.to_string(),
4877 "some_third_metric".to_string(),
4878 ),
4879 ],
4880 1,
4881 1,
4882 )
4883 .await;
4884 let plan =
4885 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4886 .await
4887 .unwrap();
4888
4889 let plan_str = plan.display_indent_schema().to_string();
4890 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4891 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4892 }
4893
4894 #[tokio::test]
4895 async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
4896 let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100");
4897
4898 let table_provider = build_test_table_provider_with_tsid(
4899 &[
4900 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4901 (
4902 DEFAULT_SCHEMA_NAME.to_string(),
4903 "some_alt_metric".to_string(),
4904 ),
4905 ],
4906 1,
4907 1,
4908 )
4909 .await;
4910 let plan =
4911 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4912 .await
4913 .unwrap();
4914
4915 let plan_str = plan.display_indent_schema().to_string();
4916 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4917 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4918 }
4919
4920 #[tokio::test]
4921 async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
4922 let eval_stmt =
4923 build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100");
4924
4925 let table_provider = build_test_table_provider_with_tsid_fields(
4926 &[
4927 (
4928 (
4929 DEFAULT_SCHEMA_NAME.to_string(),
4930 "two_field_metric".to_string(),
4931 ),
4932 2,
4933 ),
4934 (
4935 (
4936 DEFAULT_SCHEMA_NAME.to_string(),
4937 "one_field_metric".to_string(),
4938 ),
4939 1,
4940 ),
4941 ],
4942 1,
4943 )
4944 .await;
4945 let plan =
4946 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4947 .await
4948 .unwrap();
4949
4950 let field_names = plan
4951 .schema()
4952 .fields()
4953 .iter()
4954 .map(|field| field.name().clone())
4955 .collect::<Vec<_>>();
4956 let value_columns = field_names
4957 .iter()
4958 .filter(|name| {
4959 *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
4960 })
4961 .count();
4962 assert_eq!(value_columns, 1, "{field_names:?}");
4963 let plan_str = plan.display_indent_schema().to_string();
4964 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4965 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4966 }
4967
4968 #[tokio::test]
4969 async fn tsid_binary_join_uses_shorter_field_side() {
4970 let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric");
4971
4972 let table_provider = build_test_table_provider_with_tsid_fields(
4973 &[
4974 (
4975 (
4976 DEFAULT_SCHEMA_NAME.to_string(),
4977 "one_field_metric".to_string(),
4978 ),
4979 1,
4980 ),
4981 (
4982 (
4983 DEFAULT_SCHEMA_NAME.to_string(),
4984 "two_field_metric".to_string(),
4985 ),
4986 2,
4987 ),
4988 ],
4989 1,
4990 )
4991 .await;
4992 let plan =
4993 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4994 .await
4995 .unwrap();
4996
4997 let field_names = plan
4998 .schema()
4999 .fields()
5000 .iter()
5001 .map(|field| field.name().clone())
5002 .collect::<Vec<_>>();
5003 let value_columns = field_names
5004 .iter()
5005 .filter(|name| {
5006 *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
5007 })
5008 .count();
5009 assert_eq!(value_columns, 1, "{field_names:?}");
5010 }
5011
5012 #[tokio::test]
5013 async fn label_matching_modifier_disables_tsid_binary_join() {
5014 let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric");
5015
5016 let table_provider = build_test_table_provider_with_tsid(
5017 &[
5018 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5019 (
5020 DEFAULT_SCHEMA_NAME.to_string(),
5021 "some_alt_metric".to_string(),
5022 ),
5023 ],
5024 2,
5025 1,
5026 )
5027 .await;
5028 let plan =
5029 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5030 .await
5031 .unwrap();
5032
5033 let plan_str = plan.display_indent_schema().to_string();
5034 assert!(!plan_str.contains("__tsid ="), "{plan_str}");
5035 assert!(
5036 plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"),
5037 "{plan_str}"
5038 );
5039 }
5040
5041 #[tokio::test]
5042 async fn comparison_binary_join_uses_tsid_and_keeps_it_in_filtered_result() {
5043 let eval_stmt = build_eval_stmt("some_metric > some_alt_metric");
5044
5045 let table_provider = build_test_table_provider_with_tsid(
5046 &[
5047 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5048 (
5049 DEFAULT_SCHEMA_NAME.to_string(),
5050 "some_alt_metric".to_string(),
5051 ),
5052 ],
5053 2,
5054 1,
5055 )
5056 .await;
5057 let mut planner = PromPlanner {
5058 table_provider,
5059 ctx: PromPlannerContext::from_eval_stmt(&eval_stmt),
5060 };
5061 let plan = planner
5062 .prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state())
5063 .await
5064 .unwrap();
5065
5066 let plan_str = plan.display_indent_schema().to_string();
5067 assert!(
5068 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
5069 "{plan_str}"
5070 );
5071 assert!(
5072 plan.schema()
5073 .fields()
5074 .iter()
5075 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME),
5076 "{plan_str}"
5077 );
5078 assert!(planner.ctx.use_tsid, "{plan_str}");
5079 }
5080
5081 #[tokio::test]
5082 async fn comparison_bool_binary_join_uses_tsid_when_available() {
5083 let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric");
5084
5085 let table_provider = build_test_table_provider_with_tsid(
5086 &[
5087 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5088 (
5089 DEFAULT_SCHEMA_NAME.to_string(),
5090 "some_alt_metric".to_string(),
5091 ),
5092 ],
5093 2,
5094 1,
5095 )
5096 .await;
5097 let plan =
5098 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5099 .await
5100 .unwrap();
5101
5102 let plan_str = plan.display_indent_schema().to_string();
5103 assert!(
5104 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
5105 "{plan_str}"
5106 );
5107 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
5108 assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
5109 }
5110
5111 #[tokio::test]
5112 async fn scalar_count_count_range_keeps_full_window() {
5113 let plan_str = build_optimized_tsid_plan(
5114 "scalar(count(count(some_metric) by (tag_0)))",
5115 1,
5116 1,
5117 100_000,
5118 1,
5119 )
5120 .await;
5121 assert!(plan_str.contains("ScalarCalculate: tags=[]"));
5122 assert!(plan_str.contains("PromInstantManipulate: range=[0..100000000]"));
5123 assert!(!plan_str.contains("PromInstantManipulate: range=[99999000..99999000]"));
5124 }
5125
5126 #[tokio::test]
5127 async fn scalar_count_count_rewrite_applies_inside_binary_expr_for_tsid_input() {
5128 let plan_str = build_optimized_tsid_plan(
5129 "sum(irate(some_metric[1h])) / scalar(count(count(some_metric) by (tag_0)))",
5130 2,
5131 1,
5132 10,
5133 300,
5134 )
5135 .await;
5136 assert!(plan_str.contains("Distinct:"), "{plan_str}");
5137 }
5138
5139 #[tokio::test]
5140 async fn nested_count_rewrite_keeps_full_series_key_with_tsid_input() {
5141 assert_nested_count_rewrite_applies(
5142 "count(count(some_metric) by (tag_0))",
5143 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(count(some_metric.field_0))]]"
5144 )
5145 .await;
5146 }
5147
5148 #[tokio::test]
5149 async fn nested_sum_count_rewrite_keeps_full_series_key_with_tsid_input() {
5150 assert_nested_count_rewrite_applies(
5151 "count(sum(some_metric) by (tag_0))",
5152 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(sum(some_metric.field_0))]]"
5153 )
5154 .await;
5155 }
5156
5157 #[tokio::test]
5158 async fn nested_supported_inner_aggs_rewrite_apply_for_tsid_input() {
5159 for (query, expected_outer_agg) in [
5160 (
5161 "count(avg(some_metric) by (tag_0))",
5162 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(avg(some_metric.field_0))]]",
5163 ),
5164 (
5165 "count(min(some_metric) by (tag_0))",
5166 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(min(some_metric.field_0))]]",
5167 ),
5168 (
5169 "count(max(some_metric) by (tag_0))",
5170 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(max(some_metric.field_0))]]",
5171 ),
5172 (
5173 "count(stddev(some_metric) by (tag_0))",
5174 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(stddev_pop(some_metric.field_0))]]",
5175 ),
5176 (
5177 "count(stdvar(some_metric) by (tag_0))",
5178 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(var_pop(some_metric.field_0))]]",
5179 ),
5180 ] {
5181 assert_nested_count_rewrite_applies(query, expected_outer_agg).await;
5182 }
5183 }
5184
5185 #[tokio::test]
5186 async fn nested_non_count_inner_aggs_rewrite_filter_null_values_for_tsid_input() {
5187 let count_plan =
5188 build_optimized_tsid_plan("count(count(some_metric) by (tag_0))", 2, 1, 100_000, 1)
5189 .await;
5190 assert!(
5191 !count_plan.contains("some_metric.field_0 IS NOT NULL"),
5192 "{count_plan}"
5193 );
5194
5195 for query in [
5196 "count(sum(some_metric) by (tag_0))",
5197 "count(avg(some_metric) by (tag_0))",
5198 "count(min(some_metric) by (tag_0))",
5199 "count(max(some_metric) by (tag_0))",
5200 "count(stddev(some_metric) by (tag_0))",
5201 "count(stdvar(some_metric) by (tag_0))",
5202 ] {
5203 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
5204 assert!(
5205 plan_str.contains("Filter: some_metric.field_0 IS NOT NULL"),
5206 "{query}: {plan_str}"
5207 );
5208 }
5209 }
5210
5211 #[tokio::test]
5212 async fn nested_unsupported_or_non_direct_inner_aggs_do_not_rewrite() {
5213 assert_nested_count_rewrite_missing("count(group(some_metric) by (tag_0))", 2, 1).await;
5214 assert_nested_count_rewrite_missing(
5215 "count(sum(irate(some_metric[1h])) by (tag_0))",
5216 2,
5217 300,
5218 )
5219 .await;
5220 }
5221
5222 #[tokio::test]
5223 async fn physical_table_name_is_not_leaked_in_plan() {
5224 let prom_expr = parser::parse("some_metric").unwrap();
5225 let eval_stmt = EvalStmt {
5226 expr: prom_expr,
5227 start: UNIX_EPOCH,
5228 end: UNIX_EPOCH
5229 .checked_add(Duration::from_secs(100_000))
5230 .unwrap(),
5231 interval: Duration::from_secs(5),
5232 lookback_delta: Duration::from_secs(1),
5233 };
5234
5235 let table_provider = build_test_table_provider_with_tsid(
5236 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5237 1,
5238 1,
5239 )
5240 .await;
5241 let plan =
5242 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5243 .await
5244 .unwrap();
5245
5246 let plan_str = plan.display_indent_schema().to_string();
5247 assert!(plan_str.contains("TableScan: phy"), "{plan}");
5248 assert!(plan_str.contains("SubqueryAlias: some_metric"));
5249 assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
5250 assert!(!plan_str.contains("TableScan: some_metric"));
5251 }
5252
5253 #[tokio::test]
5254 async fn sum_without_does_not_group_by_tsid() {
5255 let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
5256 let eval_stmt = EvalStmt {
5257 expr: prom_expr,
5258 start: UNIX_EPOCH,
5259 end: UNIX_EPOCH
5260 .checked_add(Duration::from_secs(100_000))
5261 .unwrap(),
5262 interval: Duration::from_secs(5),
5263 lookback_delta: Duration::from_secs(1),
5264 };
5265
5266 let table_provider = build_test_table_provider_with_tsid(
5267 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5268 1,
5269 1,
5270 )
5271 .await;
5272 let plan =
5273 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5274 .await
5275 .unwrap();
5276
5277 let plan_str = plan.display_indent_schema().to_string();
5278 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5279
5280 let aggr_line = plan_str
5281 .lines()
5282 .find(|line| line.contains("Aggregate: groupBy="))
5283 .unwrap();
5284 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5285 }
5286
5287 #[tokio::test]
5288 async fn topk_without_does_not_partition_by_tsid() {
5289 let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
5290 let eval_stmt = EvalStmt {
5291 expr: prom_expr,
5292 start: UNIX_EPOCH,
5293 end: UNIX_EPOCH
5294 .checked_add(Duration::from_secs(100_000))
5295 .unwrap(),
5296 interval: Duration::from_secs(5),
5297 lookback_delta: Duration::from_secs(1),
5298 };
5299
5300 let table_provider = build_test_table_provider_with_tsid(
5301 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5302 1,
5303 1,
5304 )
5305 .await;
5306 let plan =
5307 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5308 .await
5309 .unwrap();
5310
5311 let plan_str = plan.display_indent_schema().to_string();
5312 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5313
5314 let window_line = plan_str
5315 .lines()
5316 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5317 .unwrap();
5318 let partition_by = window_line
5319 .split("PARTITION BY [")
5320 .nth(1)
5321 .and_then(|s| s.split("] ORDER BY").next())
5322 .unwrap();
5323 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5324 }
5325
5326 #[tokio::test]
5327 async fn sum_by_does_not_group_by_tsid() {
5328 let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
5329 let eval_stmt = EvalStmt {
5330 expr: prom_expr,
5331 start: UNIX_EPOCH,
5332 end: UNIX_EPOCH
5333 .checked_add(Duration::from_secs(100_000))
5334 .unwrap(),
5335 interval: Duration::from_secs(5),
5336 lookback_delta: Duration::from_secs(1),
5337 };
5338
5339 let table_provider = build_test_table_provider_with_tsid(
5340 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5341 1,
5342 1,
5343 )
5344 .await;
5345 let plan =
5346 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5347 .await
5348 .unwrap();
5349
5350 let plan_str = plan.display_indent_schema().to_string();
5351 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5352
5353 let aggr_line = plan_str
5354 .lines()
5355 .find(|line| line.contains("Aggregate: groupBy="))
5356 .unwrap();
5357 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5358 }
5359
5360 #[tokio::test]
5361 async fn topk_by_does_not_partition_by_tsid() {
5362 let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
5363 let eval_stmt = EvalStmt {
5364 expr: prom_expr,
5365 start: UNIX_EPOCH,
5366 end: UNIX_EPOCH
5367 .checked_add(Duration::from_secs(100_000))
5368 .unwrap(),
5369 interval: Duration::from_secs(5),
5370 lookback_delta: Duration::from_secs(1),
5371 };
5372
5373 let table_provider = build_test_table_provider_with_tsid(
5374 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5375 1,
5376 1,
5377 )
5378 .await;
5379 let plan =
5380 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5381 .await
5382 .unwrap();
5383
5384 let plan_str = plan.display_indent_schema().to_string();
5385 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5386
5387 let window_line = plan_str
5388 .lines()
5389 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5390 .unwrap();
5391 let partition_by = window_line
5392 .split("PARTITION BY [")
5393 .nth(1)
5394 .and_then(|s| s.split("] ORDER BY").next())
5395 .unwrap();
5396 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5397 }
5398
5399 #[tokio::test]
5400 async fn selector_matcher_on_tsid_does_not_use_internal_column() {
5401 let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
5402 let eval_stmt = EvalStmt {
5403 expr: prom_expr,
5404 start: UNIX_EPOCH,
5405 end: UNIX_EPOCH
5406 .checked_add(Duration::from_secs(100_000))
5407 .unwrap(),
5408 interval: Duration::from_secs(5),
5409 lookback_delta: Duration::from_secs(1),
5410 };
5411
5412 let table_provider = build_test_table_provider_with_tsid(
5413 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5414 1,
5415 1,
5416 )
5417 .await;
5418 let plan =
5419 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5420 .await
5421 .unwrap();
5422
5423 fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
5424 if let LogicalPlan::Filter(filter) = plan {
5425 datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
5426 }
5427 for input in plan.inputs() {
5428 collect_filter_cols(input, out);
5429 }
5430 }
5431
5432 let mut filter_cols = HashSet::new();
5433 collect_filter_cols(&plan, &mut filter_cols);
5434 assert!(
5435 !filter_cols
5436 .iter()
5437 .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
5438 );
5439 }
5440
5441 #[tokio::test]
5442 async fn tsid_is_not_used_when_physical_table_is_missing() {
5443 let prom_expr = parser::parse("some_metric").unwrap();
5444 let eval_stmt = EvalStmt {
5445 expr: prom_expr,
5446 start: UNIX_EPOCH,
5447 end: UNIX_EPOCH
5448 .checked_add(Duration::from_secs(100_000))
5449 .unwrap(),
5450 interval: Duration::from_secs(5),
5451 lookback_delta: Duration::from_secs(1),
5452 };
5453
5454 let catalog_list = MemoryCatalogManager::with_default_setup();
5455
5456 let mut columns = vec![ColumnSchema::new(
5458 "tag_0".to_string(),
5459 ConcreteDataType::string_datatype(),
5460 false,
5461 )];
5462 columns.push(
5463 ColumnSchema::new(
5464 "timestamp".to_string(),
5465 ConcreteDataType::timestamp_millisecond_datatype(),
5466 false,
5467 )
5468 .with_time_index(true),
5469 );
5470 columns.push(ColumnSchema::new(
5471 "field_0".to_string(),
5472 ConcreteDataType::float64_datatype(),
5473 true,
5474 ));
5475 let schema = Arc::new(Schema::new(columns));
5476 let mut options = table::requests::TableOptions::default();
5477 options
5478 .extra_options
5479 .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
5480 let table_meta = TableMetaBuilder::empty()
5481 .schema(schema)
5482 .primary_key_indices(vec![0])
5483 .value_indices(vec![2])
5484 .engine(METRIC_ENGINE_NAME.to_string())
5485 .options(options)
5486 .next_column_id(1024)
5487 .build()
5488 .unwrap();
5489 let table_info = TableInfoBuilder::default()
5490 .table_id(1024)
5491 .name("some_metric")
5492 .meta(table_meta)
5493 .build()
5494 .unwrap();
5495 let table = EmptyTable::from_table_info(&table_info);
5496 catalog_list
5497 .register_table_sync(RegisterTableRequest {
5498 catalog: DEFAULT_CATALOG_NAME.to_string(),
5499 schema: DEFAULT_SCHEMA_NAME.to_string(),
5500 table_name: "some_metric".to_string(),
5501 table_id: 1024,
5502 table,
5503 })
5504 .unwrap();
5505
5506 let table_provider = DfTableSourceProvider::new(
5507 catalog_list,
5508 false,
5509 QueryContext::arc(),
5510 DummyDecoder::arc(),
5511 false,
5512 );
5513
5514 let plan =
5515 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5516 .await
5517 .unwrap();
5518
5519 let plan_str = plan.display_indent_schema().to_string();
5520 assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
5521 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5522 }
5523
5524 #[tokio::test]
5525 async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
5526 let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
5527 let eval_stmt = EvalStmt {
5528 expr: prom_expr,
5529 start: UNIX_EPOCH,
5530 end: UNIX_EPOCH
5531 .checked_add(Duration::from_secs(100_000))
5532 .unwrap(),
5533 interval: Duration::from_secs(5),
5534 lookback_delta: Duration::from_secs(1),
5535 };
5536
5537 let table_provider = build_test_table_provider_with_tsid(
5538 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5539 1,
5540 1,
5541 )
5542 .await;
5543 let plan =
5544 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5545 .await
5546 .unwrap();
5547
5548 let plan_str = plan.display_indent_schema().to_string();
5549 assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
5550 assert!(
5551 !plan
5552 .schema()
5553 .fields()
5554 .iter()
5555 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5556 );
5557
5558 let prom_expr = parser::parse("sum(some_metric)").unwrap();
5560 let eval_stmt = EvalStmt {
5561 expr: prom_expr,
5562 start: UNIX_EPOCH,
5563 end: UNIX_EPOCH
5564 .checked_add(Duration::from_secs(100_000))
5565 .unwrap(),
5566 interval: Duration::from_secs(5),
5567 lookback_delta: Duration::from_secs(1),
5568 };
5569 let table_provider = build_test_table_provider_with_tsid(
5570 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5571 1,
5572 1,
5573 )
5574 .await;
5575 let plan =
5576 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5577 .await
5578 .unwrap();
5579 let plan_str = plan.display_indent_schema().to_string();
5580 assert!(!plan_str.contains("first_value"));
5581 }
5582
5583 #[tokio::test]
5584 async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5585 let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5586 let eval_stmt = EvalStmt {
5587 expr: prom_expr,
5588 start: UNIX_EPOCH,
5589 end: UNIX_EPOCH
5590 .checked_add(Duration::from_secs(100_000))
5591 .unwrap(),
5592 interval: Duration::from_secs(5),
5593 lookback_delta: Duration::from_secs(1),
5594 };
5595
5596 let table_provider = build_test_table_provider_with_tsid(
5597 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5598 1,
5599 1,
5600 )
5601 .await;
5602
5603 let plan =
5604 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5605 .await
5606 .unwrap();
5607
5608 assert!(
5609 !plan
5610 .schema()
5611 .fields()
5612 .iter()
5613 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5614 );
5615 }
5616
5617 #[tokio::test]
5618 async fn aggregate_avg() {
5619 do_aggregate_expr_plan("avg", "avg").await;
5620 }
5621
5622 #[tokio::test]
5623 #[should_panic] async fn aggregate_count() {
5625 do_aggregate_expr_plan("count", "count").await;
5626 }
5627
5628 #[tokio::test]
5629 async fn aggregate_min() {
5630 do_aggregate_expr_plan("min", "min").await;
5631 }
5632
5633 #[tokio::test]
5634 async fn aggregate_max() {
5635 do_aggregate_expr_plan("max", "max").await;
5636 }
5637
5638 #[tokio::test]
5639 async fn aggregate_group() {
5640 let prom_expr = parser::parse(
5644 "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5645 )
5646 .unwrap();
5647 let eval_stmt = EvalStmt {
5648 expr: prom_expr,
5649 start: UNIX_EPOCH,
5650 end: UNIX_EPOCH
5651 .checked_add(Duration::from_secs(100_000))
5652 .unwrap(),
5653 interval: Duration::from_secs(5),
5654 lookback_delta: Duration::from_secs(1),
5655 };
5656
5657 let table_provider = build_test_table_provider_with_fields(
5658 &[(
5659 DEFAULT_SCHEMA_NAME.to_string(),
5660 "kubernetes_build_info".to_string(),
5661 )],
5662 &["cluster", "service", "job"],
5663 )
5664 .await;
5665 let plan =
5666 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5667 .await
5668 .unwrap();
5669
5670 let plan_str = plan.display_indent_schema().to_string();
5671 assert!(plan_str.contains("max(Float64(1"));
5672 }
5673
5674 #[tokio::test]
5675 async fn aggregate_stddev() {
5676 do_aggregate_expr_plan("stddev", "stddev_pop").await;
5677 }
5678
5679 #[tokio::test]
5680 async fn aggregate_stdvar() {
5681 do_aggregate_expr_plan("stdvar", "var_pop").await;
5682 }
5683
5684 #[tokio::test]
5708 async fn binary_op_column_column() {
5709 let prom_expr =
5710 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5711 let eval_stmt = EvalStmt {
5712 expr: prom_expr,
5713 start: UNIX_EPOCH,
5714 end: UNIX_EPOCH
5715 .checked_add(Duration::from_secs(100_000))
5716 .unwrap(),
5717 interval: Duration::from_secs(5),
5718 lookback_delta: Duration::from_secs(1),
5719 };
5720
5721 let table_provider = build_test_table_provider(
5722 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5723 1,
5724 1,
5725 )
5726 .await;
5727 let plan =
5728 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5729 .await
5730 .unwrap();
5731
5732 let expected = String::from(
5733 "Projection: rhs.tag_0, rhs.timestamp, CAST(lhs.field_0 AS Float64) + CAST(rhs.field_0 AS Float64) AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
5734 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5735 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5736 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5737 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5738 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5739 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5740 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5741 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5742 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5743 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5744 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5745 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5746 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5747 );
5748
5749 assert_eq!(plan.display_indent_schema().to_string(), expected);
5750 }
5751
5752 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5753 let prom_expr = parser::parse(query).unwrap();
5754 let eval_stmt = EvalStmt {
5755 expr: prom_expr,
5756 start: UNIX_EPOCH,
5757 end: UNIX_EPOCH
5758 .checked_add(Duration::from_secs(100_000))
5759 .unwrap(),
5760 interval: Duration::from_secs(5),
5761 lookback_delta: Duration::from_secs(1),
5762 };
5763
5764 let table_provider = build_test_table_provider(
5765 &[
5766 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5767 (
5768 "greptime_private".to_string(),
5769 "some_alt_metric".to_string(),
5770 ),
5771 ],
5772 1,
5773 1,
5774 )
5775 .await;
5776 let plan =
5777 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5778 .await
5779 .unwrap();
5780
5781 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5782 }
5783
5784 #[tokio::test]
5785 async fn binary_op_literal_column() {
5786 let query = r#"1 + some_metric{tag_0="bar"}"#;
5787 let expected = String::from(
5788 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + CAST(some_metric.field_0 AS Float64) AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
5789 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5790 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5791 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5792 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5793 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5794 );
5795
5796 indie_query_plan_compare(query, expected).await;
5797 }
5798
5799 #[tokio::test]
5800 async fn binary_op_literal_literal() {
5801 let query = r#"1 + 1"#;
5802 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(ms), value:Float64;N]
5803 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
5804 indie_query_plan_compare(query, expected).await;
5805 }
5806
5807 #[tokio::test]
5808 async fn simple_bool_grammar() {
5809 let query = "some_metric != bool 1.2345";
5810 let expected = String::from(
5811 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 != Float64(1.2345):Float64;N]\
5812 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5813 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5814 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5815 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5816 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5817 );
5818
5819 indie_query_plan_compare(query, expected).await;
5820 }
5821
5822 #[tokio::test]
5823 async fn bool_with_additional_arithmetic() {
5824 let query = "some_metric + (1 == bool 2)";
5825 let expected = String::from(
5826 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 AS Float64) + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
5827 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5828 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5829 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5830 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5831 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5832 );
5833
5834 indie_query_plan_compare(query, expected).await;
5835 }
5836
5837 #[tokio::test]
5838 async fn simple_unary() {
5839 let query = "-some_metric";
5840 let expected = String::from(
5841 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(ms), (- field_0):Float64;N]\
5842 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5843 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5844 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5845 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5846 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5847 );
5848
5849 indie_query_plan_compare(query, expected).await;
5850 }
5851
5852 #[tokio::test]
5853 async fn increase_aggr() {
5854 let query = "increase(some_metric[5m])";
5855 let expected = String::from(
5856 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5857 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5858 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5859 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5860 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5861 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5862 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5863 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5864 );
5865
5866 indie_query_plan_compare(query, expected).await;
5867 }
5868
5869 #[tokio::test]
5870 async fn less_filter_on_value() {
5871 let query = "some_metric < 1.2345";
5872 let expected = String::from(
5873 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5874 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5875 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5876 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5877 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5878 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5879 );
5880
5881 indie_query_plan_compare(query, expected).await;
5882 }
5883
5884 #[tokio::test]
5885 async fn count_over_time() {
5886 let query = "count_over_time(some_metric[5m])";
5887 let expected = String::from(
5888 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5889 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5890 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5891 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5892 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5893 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5894 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5895 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5896 );
5897
5898 indie_query_plan_compare(query, expected).await;
5899 }
5900
5901 #[tokio::test]
5902 async fn test_hash_join() {
5903 let mut eval_stmt = EvalStmt {
5904 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5905 start: UNIX_EPOCH,
5906 end: UNIX_EPOCH
5907 .checked_add(Duration::from_secs(100_000))
5908 .unwrap(),
5909 interval: Duration::from_secs(5),
5910 lookback_delta: Duration::from_secs(1),
5911 };
5912
5913 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5914
5915 let prom_expr = parser::parse(case).unwrap();
5916 eval_stmt.expr = prom_expr;
5917 let table_provider = build_test_table_provider_with_fields(
5918 &[
5919 (
5920 DEFAULT_SCHEMA_NAME.to_string(),
5921 "http_server_requests_seconds_sum".to_string(),
5922 ),
5923 (
5924 DEFAULT_SCHEMA_NAME.to_string(),
5925 "http_server_requests_seconds_count".to_string(),
5926 ),
5927 ],
5928 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5929 )
5930 .await;
5931 let plan =
5933 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5934 .await
5935 .unwrap();
5936 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, CAST(http_server_requests_seconds_sum.greptime_value AS Float64) / CAST(http_server_requests_seconds_count.greptime_value AS Float64) AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5937 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5938 \n SubqueryAlias: http_server_requests_seconds_sum\
5939 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5940 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5941 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5942 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5943 \n TableScan: http_server_requests_seconds_sum\
5944 \n SubqueryAlias: http_server_requests_seconds_count\
5945 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5946 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5947 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5948 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5949 \n TableScan: http_server_requests_seconds_count";
5950 assert_eq!(plan.to_string(), expected);
5951 }
5952
5953 #[tokio::test]
5954 async fn test_nested_histogram_quantile() {
5955 let mut eval_stmt = EvalStmt {
5956 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5957 start: UNIX_EPOCH,
5958 end: UNIX_EPOCH
5959 .checked_add(Duration::from_secs(100_000))
5960 .unwrap(),
5961 interval: Duration::from_secs(5),
5962 lookback_delta: Duration::from_secs(1),
5963 };
5964
5965 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5966
5967 let prom_expr = parser::parse(case).unwrap();
5968 eval_stmt.expr = prom_expr;
5969 let table_provider = build_test_table_provider_with_fields(
5970 &[(
5971 DEFAULT_SCHEMA_NAME.to_string(),
5972 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5973 )],
5974 &["pod", "le", "path", "code", "container"],
5975 )
5976 .await;
5977 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5979 .await
5980 .unwrap();
5981 }
5982
5983 #[tokio::test]
5984 async fn test_parse_and_operator() {
5985 let mut eval_stmt = EvalStmt {
5986 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5987 start: UNIX_EPOCH,
5988 end: UNIX_EPOCH
5989 .checked_add(Duration::from_secs(100_000))
5990 .unwrap(),
5991 interval: Duration::from_secs(5),
5992 lookback_delta: Duration::from_secs(1),
5993 };
5994
5995 let cases = [
5996 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5997 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5998 ];
5999
6000 for case in cases {
6001 let prom_expr = parser::parse(case).unwrap();
6002 eval_stmt.expr = prom_expr;
6003 let table_provider = build_test_table_provider_with_fields(
6004 &[
6005 (
6006 DEFAULT_SCHEMA_NAME.to_string(),
6007 "kubelet_volume_stats_used_bytes".to_string(),
6008 ),
6009 (
6010 DEFAULT_SCHEMA_NAME.to_string(),
6011 "kubelet_volume_stats_capacity_bytes".to_string(),
6012 ),
6013 ],
6014 &["namespace", "persistentvolumeclaim"],
6015 )
6016 .await;
6017 let _ =
6019 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6020 .await
6021 .unwrap();
6022 }
6023 }
6024
6025 #[tokio::test]
6026 async fn test_nested_binary_op() {
6027 let mut eval_stmt = EvalStmt {
6028 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6029 start: UNIX_EPOCH,
6030 end: UNIX_EPOCH
6031 .checked_add(Duration::from_secs(100_000))
6032 .unwrap(),
6033 interval: Duration::from_secs(5),
6034 lookback_delta: Duration::from_secs(1),
6035 };
6036
6037 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
6038 (
6039 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
6040 or
6041 vector(0)
6042 )"#;
6043
6044 let prom_expr = parser::parse(case).unwrap();
6045 eval_stmt.expr = prom_expr;
6046 let table_provider = build_test_table_provider_with_fields(
6047 &[(
6048 DEFAULT_SCHEMA_NAME.to_string(),
6049 "nginx_ingress_controller_requests".to_string(),
6050 )],
6051 &["namespace", "job"],
6052 )
6053 .await;
6054 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6056 .await
6057 .unwrap();
6058 }
6059
6060 #[tokio::test]
6061 async fn test_parse_or_operator() {
6062 let mut eval_stmt = EvalStmt {
6063 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6064 start: UNIX_EPOCH,
6065 end: UNIX_EPOCH
6066 .checked_add(Duration::from_secs(100_000))
6067 .unwrap(),
6068 interval: Duration::from_secs(5),
6069 lookback_delta: Duration::from_secs(1),
6070 };
6071
6072 let case = r#"
6073 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
6074 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
6075 or
6076 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
6077 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
6078
6079 let table_provider = build_test_table_provider_with_fields(
6080 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
6081 &["tenant_name", "cluster_name"],
6082 )
6083 .await;
6084 eval_stmt.expr = parser::parse(case).unwrap();
6085 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6086 .await
6087 .unwrap();
6088
6089 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6090 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
6091 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6092 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
6093 or
6094 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6095 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
6096 or
6097 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6098 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
6099 let table_provider = build_test_table_provider_with_fields(
6100 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
6101 &["tenant_name", "cluster_name"],
6102 )
6103 .await;
6104 eval_stmt.expr = parser::parse(case).unwrap();
6105 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6106 .await
6107 .unwrap();
6108
6109 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
6110 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
6111 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
6112 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
6113 let table_provider = build_test_table_provider_with_fields(
6114 &[
6115 (
6116 DEFAULT_SCHEMA_NAME.to_string(),
6117 "background_waitevent_cnt".to_string(),
6118 ),
6119 (
6120 DEFAULT_SCHEMA_NAME.to_string(),
6121 "foreground_waitevent_cnt".to_string(),
6122 ),
6123 ],
6124 &["tenant_name", "cluster_name"],
6125 )
6126 .await;
6127 eval_stmt.expr = parser::parse(case).unwrap();
6128 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6129 .await
6130 .unwrap();
6131
6132 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
6133 let table_provider = build_test_table_provider_with_fields(
6134 &[
6135 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
6136 (
6137 DEFAULT_SCHEMA_NAME.to_string(),
6138 "container_cpu_load_average_10s".to_string(),
6139 ),
6140 (
6141 DEFAULT_SCHEMA_NAME.to_string(),
6142 "container_spec_cpu_quota".to_string(),
6143 ),
6144 ],
6145 &["cluster_name", "host_name"],
6146 )
6147 .await;
6148 eval_stmt.expr = parser::parse(case).unwrap();
6149 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6150 .await
6151 .unwrap();
6152 }
6153
6154 #[tokio::test]
6155 async fn value_matcher() {
6156 let mut eval_stmt = EvalStmt {
6158 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6159 start: UNIX_EPOCH,
6160 end: UNIX_EPOCH
6161 .checked_add(Duration::from_secs(100_000))
6162 .unwrap(),
6163 interval: Duration::from_secs(5),
6164 lookback_delta: Duration::from_secs(1),
6165 };
6166
6167 let cases = [
6168 (
6170 r#"some_metric{__field__="field_1"}"#,
6171 vec![
6172 "some_metric.field_1",
6173 "some_metric.tag_0",
6174 "some_metric.tag_1",
6175 "some_metric.tag_2",
6176 "some_metric.timestamp",
6177 ],
6178 ),
6179 (
6181 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
6182 vec![
6183 "some_metric.field_0",
6184 "some_metric.field_1",
6185 "some_metric.tag_0",
6186 "some_metric.tag_1",
6187 "some_metric.tag_2",
6188 "some_metric.timestamp",
6189 ],
6190 ),
6191 (
6193 r#"some_metric{__field__!="field_1"}"#,
6194 vec![
6195 "some_metric.field_0",
6196 "some_metric.field_2",
6197 "some_metric.tag_0",
6198 "some_metric.tag_1",
6199 "some_metric.tag_2",
6200 "some_metric.timestamp",
6201 ],
6202 ),
6203 (
6205 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
6206 vec![
6207 "some_metric.field_0",
6208 "some_metric.tag_0",
6209 "some_metric.tag_1",
6210 "some_metric.tag_2",
6211 "some_metric.timestamp",
6212 ],
6213 ),
6214 (
6216 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
6217 vec![
6218 "some_metric.field_1",
6219 "some_metric.tag_0",
6220 "some_metric.tag_1",
6221 "some_metric.tag_2",
6222 "some_metric.timestamp",
6223 ],
6224 ),
6225 (
6227 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
6228 vec![
6229 "some_metric.tag_0",
6230 "some_metric.tag_1",
6231 "some_metric.tag_2",
6232 "some_metric.timestamp",
6233 ],
6234 ),
6235 (
6237 r#"some_metric{__field__=~"field_1|field_2"}"#,
6238 vec![
6239 "some_metric.field_1",
6240 "some_metric.field_2",
6241 "some_metric.tag_0",
6242 "some_metric.tag_1",
6243 "some_metric.tag_2",
6244 "some_metric.timestamp",
6245 ],
6246 ),
6247 (
6249 r#"some_metric{__field__!~"field_1|field_2"}"#,
6250 vec![
6251 "some_metric.field_0",
6252 "some_metric.tag_0",
6253 "some_metric.tag_1",
6254 "some_metric.tag_2",
6255 "some_metric.timestamp",
6256 ],
6257 ),
6258 ];
6259
6260 for case in cases {
6261 let prom_expr = parser::parse(case.0).unwrap();
6262 eval_stmt.expr = prom_expr;
6263 let table_provider = build_test_table_provider(
6264 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6265 3,
6266 3,
6267 )
6268 .await;
6269 let plan =
6270 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6271 .await
6272 .unwrap();
6273 let mut fields = plan.schema().field_names();
6274 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
6275 fields.sort();
6276 expected.sort();
6277 assert_eq!(fields, expected, "case: {:?}", case.0);
6278 }
6279
6280 let bad_cases = [
6281 r#"some_metric{__field__="nonexistent"}"#,
6282 r#"some_metric{__field__!="nonexistent"}"#,
6283 ];
6284
6285 for case in bad_cases {
6286 let prom_expr = parser::parse(case).unwrap();
6287 eval_stmt.expr = prom_expr;
6288 let table_provider = build_test_table_provider(
6289 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6290 3,
6291 3,
6292 )
6293 .await;
6294 let plan =
6295 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6296 .await;
6297 assert!(plan.is_err(), "case: {:?}", case);
6298 }
6299 }
6300
6301 #[tokio::test]
6302 async fn custom_schema() {
6303 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
6304 let expected = String::from(
6305 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6306 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6307 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6308 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6309 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6310 );
6311
6312 indie_query_plan_compare(query, expected).await;
6313
6314 let query = "some_alt_metric{__database__=\"greptime_private\"}";
6315 let expected = String::from(
6316 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6317 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6318 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6319 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6320 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6321 );
6322
6323 indie_query_plan_compare(query, expected).await;
6324
6325 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
6326 let expected = String::from(
6327 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(greptime_private.some_alt_metric.field_0 AS Float64) / CAST(some_metric.field_0 AS Float64) AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
6328 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6329 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6330 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6331 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6332 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6333 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6334 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6335 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6336 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6337 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6338 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6339 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6340 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6341 );
6342
6343 indie_query_plan_compare(query, expected).await;
6344 }
6345
6346 #[tokio::test]
6347 async fn only_equals_is_supported_for_special_matcher() {
6348 let queries = &[
6349 "some_alt_metric{__schema__!=\"greptime_private\"}",
6350 "some_alt_metric{__schema__=~\"lalala\"}",
6351 "some_alt_metric{__database__!=\"greptime_private\"}",
6352 "some_alt_metric{__database__=~\"lalala\"}",
6353 ];
6354
6355 for query in queries {
6356 let prom_expr = parser::parse(query).unwrap();
6357 let eval_stmt = EvalStmt {
6358 expr: prom_expr,
6359 start: UNIX_EPOCH,
6360 end: UNIX_EPOCH
6361 .checked_add(Duration::from_secs(100_000))
6362 .unwrap(),
6363 interval: Duration::from_secs(5),
6364 lookback_delta: Duration::from_secs(1),
6365 };
6366
6367 let table_provider = build_test_table_provider(
6368 &[
6369 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
6370 (
6371 "greptime_private".to_string(),
6372 "some_alt_metric".to_string(),
6373 ),
6374 ],
6375 1,
6376 1,
6377 )
6378 .await;
6379
6380 let plan =
6381 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6382 .await;
6383 assert!(plan.is_err(), "query: {:?}", query);
6384 }
6385 }
6386
6387 #[tokio::test]
6388 async fn test_non_ms_precision() {
6389 let catalog_list = MemoryCatalogManager::with_default_setup();
6390 let columns = vec![
6391 ColumnSchema::new(
6392 "tag".to_string(),
6393 ConcreteDataType::string_datatype(),
6394 false,
6395 ),
6396 ColumnSchema::new(
6397 "timestamp".to_string(),
6398 ConcreteDataType::timestamp_nanosecond_datatype(),
6399 false,
6400 )
6401 .with_time_index(true),
6402 ColumnSchema::new(
6403 "field".to_string(),
6404 ConcreteDataType::float64_datatype(),
6405 true,
6406 ),
6407 ];
6408 let schema = Arc::new(Schema::new(columns));
6409 let table_meta = TableMetaBuilder::empty()
6410 .schema(schema)
6411 .primary_key_indices(vec![0])
6412 .value_indices(vec![2])
6413 .next_column_id(1024)
6414 .build()
6415 .unwrap();
6416 let table_info = TableInfoBuilder::default()
6417 .name("metrics".to_string())
6418 .meta(table_meta)
6419 .build()
6420 .unwrap();
6421 let table = EmptyTable::from_table_info(&table_info);
6422 assert!(
6423 catalog_list
6424 .register_table_sync(RegisterTableRequest {
6425 catalog: DEFAULT_CATALOG_NAME.to_string(),
6426 schema: DEFAULT_SCHEMA_NAME.to_string(),
6427 table_name: "metrics".to_string(),
6428 table_id: 1024,
6429 table,
6430 })
6431 .is_ok()
6432 );
6433
6434 let plan = PromPlanner::stmt_to_plan(
6435 DfTableSourceProvider::new(
6436 catalog_list.clone(),
6437 false,
6438 QueryContext::arc(),
6439 DummyDecoder::arc(),
6440 true,
6441 ),
6442 &EvalStmt {
6443 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
6444 start: UNIX_EPOCH,
6445 end: UNIX_EPOCH
6446 .checked_add(Duration::from_secs(100_000))
6447 .unwrap(),
6448 interval: Duration::from_secs(5),
6449 lookback_delta: Duration::from_secs(1),
6450 },
6451 &build_query_engine_state(),
6452 )
6453 .await
6454 .unwrap();
6455 assert_eq!(
6456 plan.display_indent_schema().to_string(),
6457 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6458 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6459 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6460 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6461 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6462 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6463 );
6464 let plan = PromPlanner::stmt_to_plan(
6465 DfTableSourceProvider::new(
6466 catalog_list.clone(),
6467 false,
6468 QueryContext::arc(),
6469 DummyDecoder::arc(),
6470 true,
6471 ),
6472 &EvalStmt {
6473 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
6474 start: UNIX_EPOCH,
6475 end: UNIX_EPOCH
6476 .checked_add(Duration::from_secs(100_000))
6477 .unwrap(),
6478 interval: Duration::from_secs(5),
6479 lookback_delta: Duration::from_secs(1),
6480 },
6481 &build_query_engine_state(),
6482 )
6483 .await
6484 .unwrap();
6485 assert_eq!(
6486 plan.display_indent_schema().to_string(),
6487 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6488 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6489 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(ms), timestamp_range:Dictionary(Int64, Timestamp(ms))]\
6490 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6491 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6492 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6493 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6494 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6495 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6496 );
6497 }
6498
6499 #[tokio::test]
6500 async fn test_nonexistent_label() {
6501 let mut eval_stmt = EvalStmt {
6503 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6504 start: UNIX_EPOCH,
6505 end: UNIX_EPOCH
6506 .checked_add(Duration::from_secs(100_000))
6507 .unwrap(),
6508 interval: Duration::from_secs(5),
6509 lookback_delta: Duration::from_secs(1),
6510 };
6511
6512 let case = r#"some_metric{nonexistent="hi"}"#;
6513 let prom_expr = parser::parse(case).unwrap();
6514 eval_stmt.expr = prom_expr;
6515 let table_provider = build_test_table_provider(
6516 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6517 3,
6518 3,
6519 )
6520 .await;
6521 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6523 .await
6524 .unwrap();
6525 }
6526
6527 #[tokio::test]
6528 async fn test_label_join() {
6529 let prom_expr = parser::parse(
6530 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
6531 )
6532 .unwrap();
6533 let eval_stmt = EvalStmt {
6534 expr: prom_expr,
6535 start: UNIX_EPOCH,
6536 end: UNIX_EPOCH
6537 .checked_add(Duration::from_secs(100_000))
6538 .unwrap(),
6539 interval: Duration::from_secs(5),
6540 lookback_delta: Duration::from_secs(1),
6541 };
6542
6543 let table_provider =
6544 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
6545 .await;
6546 let plan =
6547 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6548 .await
6549 .unwrap();
6550
6551 let expected = r#"
6552Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6553 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6554 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6555 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6556 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6557 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6558 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6559
6560 let ret = plan.display_indent_schema().to_string();
6561 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6562 }
6563
6564 #[tokio::test]
6565 async fn test_label_replace() {
6566 let prom_expr = parser::parse(
6567 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6568 )
6569 .unwrap();
6570 let eval_stmt = EvalStmt {
6571 expr: prom_expr,
6572 start: UNIX_EPOCH,
6573 end: UNIX_EPOCH
6574 .checked_add(Duration::from_secs(100_000))
6575 .unwrap(),
6576 interval: Duration::from_secs(5),
6577 lookback_delta: Duration::from_secs(1),
6578 };
6579
6580 let table_provider =
6581 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6582 .await;
6583 let plan =
6584 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6585 .await
6586 .unwrap();
6587
6588 let expected = r#"
6589Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6590 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6591 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6592 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6593 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6594 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6595 TableScan: up [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6596
6597 let ret = plan.display_indent_schema().to_string();
6598 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6599 }
6600
6601 #[tokio::test]
6602 async fn test_matchers_to_expr() {
6603 let mut eval_stmt = EvalStmt {
6604 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6605 start: UNIX_EPOCH,
6606 end: UNIX_EPOCH
6607 .checked_add(Duration::from_secs(100_000))
6608 .unwrap(),
6609 interval: Duration::from_secs(5),
6610 lookback_delta: Duration::from_secs(1),
6611 };
6612 let case =
6613 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6614
6615 let prom_expr = parser::parse(case).unwrap();
6616 eval_stmt.expr = prom_expr;
6617 let table_provider = build_test_table_provider(
6618 &[(
6619 DEFAULT_SCHEMA_NAME.to_string(),
6620 "prometheus_tsdb_head_series".to_string(),
6621 )],
6622 3,
6623 3,
6624 )
6625 .await;
6626 let plan =
6627 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6628 .await
6629 .unwrap();
6630 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6631 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6632 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6633 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6634 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6635 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6636 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6637 assert_eq!(plan.display_indent_schema().to_string(), expected);
6638 }
6639
6640 #[tokio::test]
6641 async fn test_topk_expr() {
6642 let mut eval_stmt = EvalStmt {
6643 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6644 start: UNIX_EPOCH,
6645 end: UNIX_EPOCH
6646 .checked_add(Duration::from_secs(100_000))
6647 .unwrap(),
6648 interval: Duration::from_secs(5),
6649 lookback_delta: Duration::from_secs(1),
6650 };
6651 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6652
6653 let prom_expr = parser::parse(case).unwrap();
6654 eval_stmt.expr = prom_expr;
6655 let table_provider = build_test_table_provider_with_fields(
6656 &[
6657 (
6658 DEFAULT_SCHEMA_NAME.to_string(),
6659 "prometheus_tsdb_head_series".to_string(),
6660 ),
6661 (
6662 DEFAULT_SCHEMA_NAME.to_string(),
6663 "http_server_requests_seconds_count".to_string(),
6664 ),
6665 ],
6666 &["ip"],
6667 )
6668 .await;
6669
6670 let plan =
6671 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6672 .await
6673 .unwrap();
6674 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(ms)]\
6675 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6676 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6677 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6678 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6679 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6680 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6681 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6682 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6683 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6684 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6685
6686 assert_eq!(plan.display_indent_schema().to_string(), expected);
6687 }
6688
6689 #[tokio::test]
6690 async fn test_count_values_expr() {
6691 let mut eval_stmt = EvalStmt {
6692 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6693 start: UNIX_EPOCH,
6694 end: UNIX_EPOCH
6695 .checked_add(Duration::from_secs(100_000))
6696 .unwrap(),
6697 interval: Duration::from_secs(5),
6698 lookback_delta: Duration::from_secs(1),
6699 };
6700 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6701
6702 let prom_expr = parser::parse(case).unwrap();
6703 eval_stmt.expr = prom_expr;
6704 let table_provider = build_test_table_provider_with_fields(
6705 &[
6706 (
6707 DEFAULT_SCHEMA_NAME.to_string(),
6708 "prometheus_tsdb_head_series".to_string(),
6709 ),
6710 (
6711 DEFAULT_SCHEMA_NAME.to_string(),
6712 "http_server_requests_seconds_count".to_string(),
6713 ),
6714 ],
6715 &["ip"],
6716 )
6717 .await;
6718
6719 let plan =
6720 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6721 .await
6722 .unwrap();
6723 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]\
6724 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6725 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6726 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6727 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6728 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6729 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6730 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6731 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6732
6733 assert_eq!(plan.display_indent_schema().to_string(), expected);
6734 }
6735
6736 #[tokio::test]
6737 async fn test_value_alias() {
6738 let mut eval_stmt = EvalStmt {
6739 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6740 start: UNIX_EPOCH,
6741 end: UNIX_EPOCH
6742 .checked_add(Duration::from_secs(100_000))
6743 .unwrap(),
6744 interval: Duration::from_secs(5),
6745 lookback_delta: Duration::from_secs(1),
6746 };
6747 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6748
6749 let prom_expr = parser::parse(case).unwrap();
6750 eval_stmt.expr = prom_expr;
6751 eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6752 let table_provider = build_test_table_provider_with_fields(
6753 &[
6754 (
6755 DEFAULT_SCHEMA_NAME.to_string(),
6756 "prometheus_tsdb_head_series".to_string(),
6757 ),
6758 (
6759 DEFAULT_SCHEMA_NAME.to_string(),
6760 "http_server_requests_seconds_count".to_string(),
6761 ),
6762 ],
6763 &["ip"],
6764 )
6765 .await;
6766
6767 let plan =
6768 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6769 .await
6770 .unwrap();
6771 let expected = r#"
6772Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(ms)]
6773 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]
6774 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6775 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6776 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6777 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6778 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6779 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6780 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6781 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]"#;
6782 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6783 }
6784
6785 #[tokio::test]
6786 async fn test_quantile_expr() {
6787 let mut eval_stmt = EvalStmt {
6788 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6789 start: UNIX_EPOCH,
6790 end: UNIX_EPOCH
6791 .checked_add(Duration::from_secs(100_000))
6792 .unwrap(),
6793 interval: Duration::from_secs(5),
6794 lookback_delta: Duration::from_secs(1),
6795 };
6796 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6797
6798 let prom_expr = parser::parse(case).unwrap();
6799 eval_stmt.expr = prom_expr;
6800 let table_provider = build_test_table_provider_with_fields(
6801 &[
6802 (
6803 DEFAULT_SCHEMA_NAME.to_string(),
6804 "prometheus_tsdb_head_series".to_string(),
6805 ),
6806 (
6807 DEFAULT_SCHEMA_NAME.to_string(),
6808 "http_server_requests_seconds_count".to_string(),
6809 ),
6810 ],
6811 &["ip"],
6812 )
6813 .await;
6814
6815 let plan =
6816 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6817 .await
6818 .unwrap();
6819 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6820 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6821 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6822 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6823 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6824 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6825 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6826 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6827 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6828
6829 assert_eq!(plan.display_indent_schema().to_string(), expected);
6830 }
6831
6832 #[tokio::test]
6833 async fn test_or_not_exists_table_label() {
6834 let mut eval_stmt = EvalStmt {
6835 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6836 start: UNIX_EPOCH,
6837 end: UNIX_EPOCH
6838 .checked_add(Duration::from_secs(100_000))
6839 .unwrap(),
6840 interval: Duration::from_secs(5),
6841 lookback_delta: Duration::from_secs(1),
6842 };
6843 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6844
6845 let prom_expr = parser::parse(case).unwrap();
6846 eval_stmt.expr = prom_expr;
6847 let table_provider = build_test_table_provider_with_fields(
6848 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6849 &["job"],
6850 )
6851 .await;
6852
6853 let plan =
6854 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6855 .await
6856 .unwrap();
6857 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6858 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6859 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6860 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6861 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6862 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6863 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6864 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6865 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6866 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6867 SubqueryAlias: [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6868 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6869 Sort: .time ASC NULLS LAST [time:Timestamp(ms), sum(.value):Float64;N]
6870 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(ms), sum(.value):Float64;N]
6871 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(ms), value:Float64;N]
6872 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
6873
6874 assert_eq!(plan.display_indent_schema().to_string(), expected);
6875 }
6876
6877 #[tokio::test]
6878 async fn test_histogram_quantile_missing_le_column() {
6879 let mut eval_stmt = EvalStmt {
6880 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6881 start: UNIX_EPOCH,
6882 end: UNIX_EPOCH
6883 .checked_add(Duration::from_secs(100_000))
6884 .unwrap(),
6885 interval: Duration::from_secs(5),
6886 lookback_delta: Duration::from_secs(1),
6887 };
6888
6889 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6891
6892 let prom_expr = parser::parse(case).unwrap();
6893 eval_stmt.expr = prom_expr;
6894
6895 let table_provider = build_test_table_provider_with_fields(
6897 &[(
6898 DEFAULT_SCHEMA_NAME.to_string(),
6899 "non_existent_histogram_bucket".to_string(),
6900 )],
6901 &["pod", "instance"], )
6903 .await;
6904
6905 let result =
6907 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6908 .await;
6909
6910 assert!(
6912 result.is_ok(),
6913 "Expected successful plan creation with empty result, but got error: {:?}",
6914 result.err()
6915 );
6916
6917 let plan = result.unwrap();
6919 match plan {
6920 LogicalPlan::EmptyRelation(_) => {
6921 }
6923 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6924 }
6925 }
6926}