1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51 ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, DoubleExponentialSmoothing,
63 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65 quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74 VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80 METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85 ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86 EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96 ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100const SPECIAL_TIME_FUNCTION: &str = "time";
102const SCALAR_FUNCTION: &str = "scalar";
104const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110const LE_COLUMN_NAME: &str = "le";
112
113static LABEL_NAME_REGEX: Lazy<Regex> =
116 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120const DEFAULT_FIELD_COLUMN: &str = "value";
122
123const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130const MAX_SCATTER_POINTS: i64 = 400;
132
133const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138 start: Millisecond,
140 end: Millisecond,
141 interval: Millisecond,
142 lookback_delta: Millisecond,
143
144 table_name: Option<String>,
146 time_index_column: Option<String>,
147 field_columns: Vec<String>,
148 tag_columns: Vec<String>,
149 use_tsid: bool,
155 field_column_matcher: Option<Vec<Matcher>>,
157 selector_matcher: Vec<Matcher>,
159 schema_name: Option<String>,
160 range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166 Self {
167 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169 interval: stmt.interval.as_millis() as _,
170 lookback_delta: stmt.lookback_delta.as_millis() as _,
171 ..Default::default()
172 }
173 }
174
175 fn reset(&mut self) {
177 self.table_name = None;
178 self.time_index_column = None;
179 self.field_columns = vec![];
180 self.tag_columns = vec![];
181 self.use_tsid = false;
182 self.field_column_matcher = None;
183 self.selector_matcher.clear();
184 self.schema_name = None;
185 self.range = None;
186 }
187
188 fn reset_table_name_and_schema(&mut self) {
190 self.table_name = Some(String::new());
191 self.schema_name = None;
192 self.use_tsid = false;
193 }
194
195 fn has_le_tag(&self) -> bool {
197 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198 }
199}
200
201pub struct PromPlanner {
202 table_provider: DfTableSourceProvider,
203 ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207 pub async fn stmt_to_plan(
208 table_provider: DfTableSourceProvider,
209 stmt: &EvalStmt,
210 query_engine_state: &QueryEngineState,
211 ) -> Result<LogicalPlan> {
212 let mut planner = Self {
213 table_provider,
214 ctx: PromPlannerContext::from_eval_stmt(stmt),
215 };
216
217 let plan = planner
218 .prom_expr_to_plan(&stmt.expr, query_engine_state)
219 .await?;
220
221 planner.strip_tsid_column(plan)
223 }
224
225 pub async fn prom_expr_to_plan(
226 &mut self,
227 prom_expr: &PromExpr,
228 query_engine_state: &QueryEngineState,
229 ) -> Result<LogicalPlan> {
230 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231 .await
232 }
233
234 #[async_recursion]
244 async fn prom_expr_to_plan_inner(
245 &mut self,
246 prom_expr: &PromExpr,
247 timestamp_fn: bool,
248 query_engine_state: &QueryEngineState,
249 ) -> Result<LogicalPlan> {
250 let res = match prom_expr {
251 PromExpr::Aggregate(expr) => {
252 self.prom_aggr_expr_to_plan(query_engine_state, expr)
253 .await?
254 }
255 PromExpr::Unary(expr) => {
256 self.prom_unary_expr_to_plan(query_engine_state, expr)
257 .await?
258 }
259 PromExpr::Binary(expr) => {
260 self.prom_binary_expr_to_plan(query_engine_state, expr)
261 .await?
262 }
263 PromExpr::Paren(ParenExpr { expr }) => {
264 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265 .await?
266 }
267 PromExpr::Subquery(expr) => {
268 self.prom_subquery_expr_to_plan(query_engine_state, expr)
269 .await?
270 }
271 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273 PromExpr::VectorSelector(selector) => {
274 self.prom_vector_selector_to_plan(selector, timestamp_fn)
275 .await?
276 }
277 PromExpr::MatrixSelector(selector) => {
278 self.prom_matrix_selector_to_plan(selector).await?
279 }
280 PromExpr::Call(expr) => {
281 self.prom_call_expr_to_plan(query_engine_state, expr)
282 .await?
283 }
284 PromExpr::Extension(expr) => {
285 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286 }
287 };
288
289 Ok(res)
290 }
291
292 async fn prom_subquery_expr_to_plan(
293 &mut self,
294 query_engine_state: &QueryEngineState,
295 subquery_expr: &SubqueryExpr,
296 ) -> Result<LogicalPlan> {
297 let SubqueryExpr {
298 expr, range, step, ..
299 } = subquery_expr;
300
301 let current_interval = self.ctx.interval;
302 if let Some(step) = step {
303 self.ctx.interval = step.as_millis() as _;
304 }
305 let current_start = self.ctx.start;
306 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308 self.ctx.interval = current_interval;
309 self.ctx.start = current_start;
310
311 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312 let range_ms = range.as_millis() as _;
313 self.ctx.range = Some(range_ms);
314
315 let manipulate = RangeManipulate::new(
316 self.ctx.start,
317 self.ctx.end,
318 self.ctx.interval,
319 range_ms,
320 self.ctx
321 .time_index_column
322 .clone()
323 .expect("time index should be set in `setup_context`"),
324 self.ctx.field_columns.clone(),
325 input,
326 )
327 .context(DataFusionPlanningSnafu)?;
328
329 Ok(LogicalPlan::Extension(Extension {
330 node: Arc::new(manipulate),
331 }))
332 }
333
334 async fn prom_aggr_expr_to_plan(
335 &mut self,
336 query_engine_state: &QueryEngineState,
337 aggr_expr: &AggregateExpr,
338 ) -> Result<LogicalPlan> {
339 let AggregateExpr {
340 op,
341 expr,
342 modifier,
343 param,
344 } = aggr_expr;
345
346 let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347 let input_has_tsid = input.schema().fields().iter().any(|field| {
348 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349 && field.data_type() == &ArrowDataType::UInt64
350 });
351
352 let required_group_tags = match modifier {
355 None => BTreeSet::new(),
356 Some(LabelModifier::Include(labels)) => labels
357 .labels
358 .iter()
359 .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360 .cloned()
361 .collect(),
362 Some(LabelModifier::Exclude(labels)) => {
363 let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364 for label in &labels.labels {
365 let _ = all_tags.remove(label);
366 }
367 all_tags
368 }
369 };
370
371 if !required_group_tags.is_empty()
372 && required_group_tags
373 .iter()
374 .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375 {
376 input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377 self.refresh_tag_columns_from_schema(input.schema());
378 }
379
380 match (*op).id() {
381 token::T_TOPK | token::T_BOTTOMK => {
382 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383 }
384 _ => {
385 let input_tag_columns = if input_has_tsid {
389 self.collect_row_key_tag_columns_from_plan(&input)?
390 .into_iter()
391 .collect::<Vec<_>>()
392 } else {
393 self.ctx.tag_columns.clone()
394 };
395 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398 let (mut aggr_exprs, prev_field_exprs) =
400 self.create_aggregate_exprs(*op, param, &input)?;
401
402 let keep_tsid = op.id() != token::T_COUNT_VALUES
403 && input_has_tsid
404 && input_tag_columns.iter().collect::<HashSet<_>>()
405 == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407 if keep_tsid {
408 aggr_exprs.push(
409 first_value(
410 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411 vec![],
412 )
413 .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414 );
415 }
416 self.ctx.use_tsid = keep_tsid;
417
418 let builder = LogicalPlanBuilder::from(input);
420 let builder = if op.id() == token::T_COUNT_VALUES {
421 let label = Self::get_param_value_as_str(*op, param)?;
422 group_exprs.extend(prev_field_exprs.clone());
425 let project_fields = self
426 .create_field_column_exprs()?
427 .into_iter()
428 .chain(self.create_tag_column_exprs()?)
429 .chain(Some(self.create_time_index_column_expr()?))
430 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432 builder
433 .aggregate(group_exprs.clone(), aggr_exprs)
434 .context(DataFusionPlanningSnafu)?
435 .project(project_fields)
436 .context(DataFusionPlanningSnafu)?
437 } else {
438 builder
439 .aggregate(group_exprs.clone(), aggr_exprs)
440 .context(DataFusionPlanningSnafu)?
441 };
442
443 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445 builder
446 .sort(sort_expr)
447 .context(DataFusionPlanningSnafu)?
448 .build()
449 .context(DataFusionPlanningSnafu)
450 }
451 }
452 }
453
454 async fn prom_topk_bottomk_to_plan(
456 &mut self,
457 aggr_expr: &AggregateExpr,
458 input: LogicalPlan,
459 ) -> Result<LogicalPlan> {
460 let AggregateExpr {
461 op,
462 param,
463 modifier,
464 ..
465 } = aggr_expr;
466
467 let input_has_tsid = input.schema().fields().iter().any(|field| {
468 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469 && field.data_type() == &ArrowDataType::UInt64
470 });
471 self.ctx.use_tsid = input_has_tsid;
472
473 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480 let rank_columns: Vec<_> = window_exprs
481 .iter()
482 .map(|expr| expr.schema_name().to_string())
483 .collect();
484
485 let filter: DfExpr = rank_columns
488 .iter()
489 .fold(None, |expr, rank| {
490 let predicate = DfExpr::BinaryExpr(BinaryExpr {
491 left: Box::new(col(rank)),
492 op: Operator::LtEq,
493 right: Box::new(val.clone()),
494 });
495
496 match expr {
497 None => Some(predicate),
498 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499 left: Box::new(expr),
500 op: Operator::Or,
501 right: Box::new(predicate),
502 })),
503 }
504 })
505 .unwrap();
506
507 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509 let mut new_group_exprs = group_exprs.clone();
510 new_group_exprs.extend(rank_columns);
512
513 let group_sort_expr = new_group_exprs
514 .into_iter()
515 .map(|expr| expr.sort(true, false));
516
517 let project_fields = self
518 .create_field_column_exprs()?
519 .into_iter()
520 .chain(self.create_tag_column_exprs()?)
521 .chain(
522 self.ctx
523 .use_tsid
524 .then_some(DfExpr::Column(Column::from_name(
525 DATA_SCHEMA_TSID_COLUMN_NAME,
526 ))),
527 )
528 .chain(Some(self.create_time_index_column_expr()?));
529
530 LogicalPlanBuilder::from(input)
531 .window(window_exprs)
532 .context(DataFusionPlanningSnafu)?
533 .filter(filter)
534 .context(DataFusionPlanningSnafu)?
535 .sort(group_sort_expr)
536 .context(DataFusionPlanningSnafu)?
537 .project(project_fields)
538 .context(DataFusionPlanningSnafu)?
539 .build()
540 .context(DataFusionPlanningSnafu)
541 }
542
543 async fn prom_unary_expr_to_plan(
544 &mut self,
545 query_engine_state: &QueryEngineState,
546 unary_expr: &UnaryExpr,
547 ) -> Result<LogicalPlan> {
548 let UnaryExpr { expr } = unary_expr;
549 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
551 self.projection_for_each_field_column(input, |col| {
552 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
553 })
554 }
555
556 async fn prom_binary_expr_to_plan(
557 &mut self,
558 query_engine_state: &QueryEngineState,
559 binary_expr: &PromBinaryExpr,
560 ) -> Result<LogicalPlan> {
561 let PromBinaryExpr {
562 lhs,
563 rhs,
564 op,
565 modifier,
566 } = binary_expr;
567
568 let should_return_bool = if let Some(m) = modifier {
571 m.return_bool
572 } else {
573 false
574 };
575 let is_comparison_op = Self::is_token_a_comparison_op(*op);
576
577 match (
580 Self::try_build_literal_expr(lhs),
581 Self::try_build_literal_expr(rhs),
582 ) {
583 (Some(lhs), Some(rhs)) => {
584 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
585 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
586 self.ctx.reset_table_name_and_schema();
587 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
588 let mut field_expr = field_expr_builder(lhs, rhs)?;
589
590 if is_comparison_op && should_return_bool {
591 field_expr = DfExpr::Cast(Cast {
592 expr: Box::new(field_expr),
593 data_type: ArrowDataType::Float64,
594 });
595 }
596
597 Ok(LogicalPlan::Extension(Extension {
598 node: Arc::new(
599 EmptyMetric::new(
600 self.ctx.start,
601 self.ctx.end,
602 self.ctx.interval,
603 SPECIAL_TIME_FUNCTION.to_string(),
604 DEFAULT_FIELD_COLUMN.to_string(),
605 Some(field_expr),
606 )
607 .context(DataFusionPlanningSnafu)?,
608 ),
609 }))
610 }
611 (Some(mut expr), None) => {
613 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
614 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
616 expr = time_expr
617 }
618 let bin_expr_builder = |col: &String| {
619 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
620 let mut binary_expr =
621 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
622
623 if is_comparison_op && should_return_bool {
624 binary_expr = DfExpr::Cast(Cast {
625 expr: Box::new(binary_expr),
626 data_type: ArrowDataType::Float64,
627 });
628 }
629 Ok(binary_expr)
630 };
631 if is_comparison_op && !should_return_bool {
632 self.filter_on_field_column(input, bin_expr_builder)
633 } else {
634 self.projection_for_each_field_column(input, bin_expr_builder)
635 }
636 }
637 (None, Some(mut expr)) => {
639 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
640 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
642 expr = time_expr
643 }
644 let bin_expr_builder = |col: &String| {
645 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
646 let mut binary_expr =
647 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
648
649 if is_comparison_op && should_return_bool {
650 binary_expr = DfExpr::Cast(Cast {
651 expr: Box::new(binary_expr),
652 data_type: ArrowDataType::Float64,
653 });
654 }
655 Ok(binary_expr)
656 };
657 if is_comparison_op && !should_return_bool {
658 self.filter_on_field_column(input, bin_expr_builder)
659 } else {
660 self.projection_for_each_field_column(input, bin_expr_builder)
661 }
662 }
663 (None, None) => {
665 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
666 let left_field_columns = self.ctx.field_columns.clone();
667 let left_time_index_column = self.ctx.time_index_column.clone();
668 let mut left_table_ref = self
669 .table_ref()
670 .unwrap_or_else(|_| TableReference::bare(""));
671 let left_context = self.ctx.clone();
672
673 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
674 let right_field_columns = self.ctx.field_columns.clone();
675 let right_time_index_column = self.ctx.time_index_column.clone();
676 let mut right_table_ref = self
677 .table_ref()
678 .unwrap_or_else(|_| TableReference::bare(""));
679 let right_context = self.ctx.clone();
680
681 if Self::is_token_a_set_op(*op) {
685 return self.set_op_on_non_field_columns(
686 left_input,
687 right_input,
688 left_context,
689 right_context,
690 *op,
691 modifier,
692 );
693 }
694
695 if left_table_ref == right_table_ref {
697 left_table_ref = TableReference::bare("lhs");
699 right_table_ref = TableReference::bare("rhs");
700 if self.ctx.tag_columns.is_empty() {
706 self.ctx = left_context.clone();
707 self.ctx.table_name = Some("lhs".to_string());
708 } else {
709 self.ctx.table_name = Some("rhs".to_string());
710 }
711 }
712 let (output_field_columns, field_columns) =
713 Self::align_binary_field_columns(&left_field_columns, &right_field_columns);
714 self.ctx.field_columns = output_field_columns;
718 let mut field_columns = field_columns.into_iter();
719
720 let join_plan = self.join_on_non_field_columns(
721 left_input,
722 right_input,
723 left_table_ref.clone(),
724 right_table_ref.clone(),
725 left_time_index_column,
726 right_time_index_column,
727 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
730 modifier,
731 )?;
732 let join_plan_schema = join_plan.schema().clone();
733
734 let bin_expr_builder = |_: &String| {
735 let (left_col_name, right_col_name) = field_columns.next().unwrap();
736 let left_col = join_plan_schema
737 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
738 .context(DataFusionPlanningSnafu)?
739 .into();
740 let right_col = join_plan_schema
741 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
742 .context(DataFusionPlanningSnafu)?
743 .into();
744
745 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
746 let mut binary_expr =
747 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
748 if is_comparison_op && should_return_bool {
749 binary_expr = DfExpr::Cast(Cast {
750 expr: Box::new(binary_expr),
751 data_type: ArrowDataType::Float64,
752 });
753 }
754 Ok(binary_expr)
755 };
756 if is_comparison_op && !should_return_bool {
757 let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
764 let (project_table_ref, project_context) =
765 match (lhs.value_type(), rhs.value_type()) {
766 (ValueType::Scalar, ValueType::Vector) => {
767 (&right_table_ref, &right_context)
768 }
769 _ => (&left_table_ref, &left_context),
770 };
771 self.project_binary_join_side(filtered, project_table_ref, project_context)
772 } else {
773 self.projection_for_each_field_column(join_plan, bin_expr_builder)
774 }
775 }
776 }
777 }
778
779 fn project_binary_join_side(
780 &mut self,
781 input: LogicalPlan,
782 table_ref: &TableReference,
783 context: &PromPlannerContext,
784 ) -> Result<LogicalPlan> {
785 let schema = input.schema();
786
787 let mut project_exprs =
788 Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
789
790 if let Some(time_index_column) = &context.time_index_column {
792 let time_index_col = schema
793 .qualified_field_with_name(Some(table_ref), time_index_column)
794 .context(DataFusionPlanningSnafu)?
795 .into();
796 project_exprs.push(DfExpr::Column(time_index_col));
797 }
798
799 for field_column in &context.field_columns {
801 let field_col = schema
802 .qualified_field_with_name(Some(table_ref), field_column)
803 .context(DataFusionPlanningSnafu)?
804 .into();
805 project_exprs.push(DfExpr::Column(field_col));
806 }
807
808 for tag_column in &context.tag_columns {
810 let tag_col = schema
811 .qualified_field_with_name(Some(table_ref), tag_column)
812 .context(DataFusionPlanningSnafu)?
813 .into();
814 project_exprs.push(DfExpr::Column(tag_col));
815 }
816
817 if let Some(tsid_col) =
820 Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid)
821 {
822 project_exprs.push(tsid_col);
823 }
824
825 let plan = LogicalPlanBuilder::from(input)
826 .project(project_exprs)
827 .context(DataFusionPlanningSnafu)?
828 .build()
829 .context(DataFusionPlanningSnafu)?;
830
831 self.ctx = context.clone();
834 self.ctx.table_name = None;
835 self.ctx.schema_name = None;
836
837 Ok(plan)
838 }
839
840 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
841 let NumberLiteral { val } = number_literal;
842 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
843 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
844 self.ctx.reset_table_name_and_schema();
845 let literal_expr = df_prelude::lit(*val);
846
847 let plan = LogicalPlan::Extension(Extension {
848 node: Arc::new(
849 EmptyMetric::new(
850 self.ctx.start,
851 self.ctx.end,
852 self.ctx.interval,
853 SPECIAL_TIME_FUNCTION.to_string(),
854 DEFAULT_FIELD_COLUMN.to_string(),
855 Some(literal_expr),
856 )
857 .context(DataFusionPlanningSnafu)?,
858 ),
859 });
860 Ok(plan)
861 }
862
863 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
864 let StringLiteral { val } = string_literal;
865 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
866 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
867 self.ctx.reset_table_name_and_schema();
868 let literal_expr = df_prelude::lit(val.clone());
869
870 let plan = LogicalPlan::Extension(Extension {
871 node: Arc::new(
872 EmptyMetric::new(
873 self.ctx.start,
874 self.ctx.end,
875 self.ctx.interval,
876 SPECIAL_TIME_FUNCTION.to_string(),
877 DEFAULT_FIELD_COLUMN.to_string(),
878 Some(literal_expr),
879 )
880 .context(DataFusionPlanningSnafu)?,
881 ),
882 });
883 Ok(plan)
884 }
885
886 async fn prom_vector_selector_to_plan(
887 &mut self,
888 vector_selector: &VectorSelector,
889 timestamp_fn: bool,
890 ) -> Result<LogicalPlan> {
891 let VectorSelector {
892 name,
893 offset,
894 matchers,
895 at: _,
896 } = vector_selector;
897 let matchers = self.preprocess_label_matchers(matchers, name)?;
898 if let Some(empty_plan) = self.setup_context().await? {
899 return Ok(empty_plan);
900 }
901 let normalize = self
902 .selector_to_series_normalize_plan(offset, matchers, false)
903 .await?;
904
905 let normalize = if timestamp_fn {
906 self.create_timestamp_func_plan(normalize)?
909 } else {
910 normalize
911 };
912
913 let manipulate = InstantManipulate::new(
914 self.ctx.start,
915 self.ctx.end,
916 self.ctx.lookback_delta,
917 self.ctx.interval,
918 self.ctx
919 .time_index_column
920 .clone()
921 .expect("time index should be set in `setup_context`"),
922 if self.ctx.use_tsid {
923 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
924 } else {
925 self.ctx.tag_columns.clone()
926 },
927 self.ctx.field_columns.first().cloned(),
928 normalize,
929 );
930 Ok(LogicalPlan::Extension(Extension {
931 node: Arc::new(manipulate),
932 }))
933 }
934
935 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
957 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
958 .alias(DEFAULT_FIELD_COLUMN);
959 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
960 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
961 project_exprs.push(self.create_time_index_column_expr()?);
962 project_exprs.push(time_expr);
963 project_exprs.extend(self.create_tag_column_exprs()?);
964
965 LogicalPlanBuilder::from(normalize)
966 .project(project_exprs)
967 .context(DataFusionPlanningSnafu)?
968 .build()
969 .context(DataFusionPlanningSnafu)
970 }
971
972 async fn prom_matrix_selector_to_plan(
973 &mut self,
974 matrix_selector: &MatrixSelector,
975 ) -> Result<LogicalPlan> {
976 let MatrixSelector { vs, range } = matrix_selector;
977 let VectorSelector {
978 name,
979 offset,
980 matchers,
981 ..
982 } = vs;
983 let matchers = self.preprocess_label_matchers(matchers, name)?;
984 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
985 let range_ms = range.as_millis() as _;
986 self.ctx.range = Some(range_ms);
987
988 let normalize = match self.setup_context().await? {
991 Some(empty_plan) => empty_plan,
992 None => {
993 self.selector_to_series_normalize_plan(offset, matchers, true)
994 .await?
995 }
996 };
997 let manipulate = RangeManipulate::new(
998 self.ctx.start,
999 self.ctx.end,
1000 self.ctx.interval,
1001 range_ms,
1003 self.ctx
1004 .time_index_column
1005 .clone()
1006 .expect("time index should be set in `setup_context`"),
1007 self.ctx.field_columns.clone(),
1008 normalize,
1009 )
1010 .context(DataFusionPlanningSnafu)?;
1011
1012 Ok(LogicalPlan::Extension(Extension {
1013 node: Arc::new(manipulate),
1014 }))
1015 }
1016
1017 async fn prom_call_expr_to_plan(
1018 &mut self,
1019 query_engine_state: &QueryEngineState,
1020 call_expr: &Call,
1021 ) -> Result<LogicalPlan> {
1022 let Call { func, args } = call_expr;
1023 match func.name {
1025 SPECIAL_HISTOGRAM_QUANTILE => {
1026 return self.create_histogram_plan(args, query_engine_state).await;
1027 }
1028 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1029 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1030 SPECIAL_ABSENT_FUNCTION => {
1031 return self.create_absent_plan(args, query_engine_state).await;
1032 }
1033 _ => {}
1034 }
1035
1036 let args = self.create_function_args(&args.args)?;
1038 let input = if let Some(prom_expr) = &args.input {
1039 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1040 .await?
1041 } else {
1042 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1043 self.ctx.reset_table_name_and_schema();
1044 self.ctx.tag_columns = vec![];
1045 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1046 LogicalPlan::Extension(Extension {
1047 node: Arc::new(
1048 EmptyMetric::new(
1049 self.ctx.start,
1050 self.ctx.end,
1051 self.ctx.interval,
1052 SPECIAL_TIME_FUNCTION.to_string(),
1053 DEFAULT_FIELD_COLUMN.to_string(),
1054 None,
1055 )
1056 .context(DataFusionPlanningSnafu)?,
1057 ),
1058 })
1059 };
1060 let (mut func_exprs, new_tags) =
1061 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1062 func_exprs.insert(0, self.create_time_index_column_expr()?);
1063 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1064
1065 let builder = LogicalPlanBuilder::from(input)
1066 .project(func_exprs)
1067 .context(DataFusionPlanningSnafu)?
1068 .filter(self.create_empty_values_filter_expr()?)
1069 .context(DataFusionPlanningSnafu)?;
1070
1071 let builder = match func.name {
1072 "sort" => builder
1073 .sort(self.create_field_columns_sort_exprs(true))
1074 .context(DataFusionPlanningSnafu)?,
1075 "sort_desc" => builder
1076 .sort(self.create_field_columns_sort_exprs(false))
1077 .context(DataFusionPlanningSnafu)?,
1078 "sort_by_label" => builder
1079 .sort(Self::create_sort_exprs_by_tags(
1080 func.name,
1081 args.literals,
1082 true,
1083 )?)
1084 .context(DataFusionPlanningSnafu)?,
1085 "sort_by_label_desc" => builder
1086 .sort(Self::create_sort_exprs_by_tags(
1087 func.name,
1088 args.literals,
1089 false,
1090 )?)
1091 .context(DataFusionPlanningSnafu)?,
1092
1093 _ => builder,
1094 };
1095
1096 for tag in new_tags {
1099 self.ctx.tag_columns.push(tag);
1100 }
1101
1102 let plan = builder.build().context(DataFusionPlanningSnafu)?;
1103 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1104
1105 Ok(plan)
1106 }
1107
1108 async fn prom_ext_expr_to_plan(
1109 &mut self,
1110 query_engine_state: &QueryEngineState,
1111 ext_expr: &promql_parser::parser::ast::Extension,
1112 ) -> Result<LogicalPlan> {
1113 let expr = &ext_expr.expr;
1115 let children = expr.children();
1116 let plan = self
1117 .prom_expr_to_plan(&children[0], query_engine_state)
1118 .await?;
1119 match expr.name() {
1125 ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1126 .explain(false, true)
1127 .unwrap()
1128 .build()
1129 .context(DataFusionPlanningSnafu),
1130 ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1131 .explain(true, true)
1132 .unwrap()
1133 .build()
1134 .context(DataFusionPlanningSnafu),
1135 EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1136 .explain(false, false)
1137 .unwrap()
1138 .build()
1139 .context(DataFusionPlanningSnafu),
1140 EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1141 .explain(true, false)
1142 .unwrap()
1143 .build()
1144 .context(DataFusionPlanningSnafu),
1145 ALIAS_NODE_NAME => {
1146 let alias = expr
1147 .as_any()
1148 .downcast_ref::<AliasExpr>()
1149 .context(UnexpectedPlanExprSnafu {
1150 desc: "Expected AliasExpr",
1151 })?
1152 .alias
1153 .clone();
1154 self.apply_alias(plan, alias)
1155 }
1156 _ => LogicalPlanBuilder::empty(true)
1157 .build()
1158 .context(DataFusionPlanningSnafu),
1159 }
1160 }
1161
1162 #[allow(clippy::mutable_key_type)]
1172 fn preprocess_label_matchers(
1173 &mut self,
1174 label_matchers: &Matchers,
1175 name: &Option<String>,
1176 ) -> Result<Matchers> {
1177 self.ctx.reset();
1178
1179 let metric_name;
1180 if let Some(name) = name.clone() {
1181 metric_name = Some(name);
1182 ensure!(
1183 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1184 MultipleMetricMatchersSnafu
1185 );
1186 } else {
1187 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1188 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1189 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1190 ensure!(
1191 matches[0].op == MatchOp::Equal,
1192 UnsupportedMatcherOpSnafu {
1193 matcher_op: matches[0].op.to_string(),
1194 matcher: METRIC_NAME
1195 }
1196 );
1197 metric_name = matches.pop().map(|m| m.value);
1198 }
1199
1200 self.ctx.table_name = metric_name;
1201
1202 let mut matchers = HashSet::new();
1203 for matcher in &label_matchers.matchers {
1204 if matcher.name == FIELD_COLUMN_MATCHER {
1206 self.ctx
1207 .field_column_matcher
1208 .get_or_insert_default()
1209 .push(matcher.clone());
1210 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1211 ensure!(
1212 matcher.op == MatchOp::Equal,
1213 UnsupportedMatcherOpSnafu {
1214 matcher: matcher.name.clone(),
1215 matcher_op: matcher.op.to_string(),
1216 }
1217 );
1218 self.ctx.schema_name = Some(matcher.value.clone());
1219 } else if matcher.name != METRIC_NAME {
1220 self.ctx.selector_matcher.push(matcher.clone());
1221 let _ = matchers.insert(matcher.clone());
1222 }
1223 }
1224
1225 Ok(Matchers::new(matchers.into_iter().collect()))
1226 }
1227
1228 async fn selector_to_series_normalize_plan(
1229 &mut self,
1230 offset: &Option<Offset>,
1231 label_matchers: Matchers,
1232 is_range_selector: bool,
1233 ) -> Result<LogicalPlan> {
1234 let table_ref = self.table_ref()?;
1236 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1237 let table_schema = table_scan.schema();
1238
1239 let offset_duration = match offset {
1241 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1242 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1243 None => 0,
1244 };
1245 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1246 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1247 scan_filters.push(time_index_filter);
1248 }
1249 table_scan = LogicalPlanBuilder::from(table_scan)
1250 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1252 .build()
1253 .context(DataFusionPlanningSnafu)?;
1254
1255 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1257 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1258 let mut result_set = HashSet::new();
1260 let mut reverse_set = HashSet::new();
1262 for matcher in field_matchers {
1263 match &matcher.op {
1264 MatchOp::Equal => {
1265 if col_set.contains(&matcher.value) {
1266 let _ = result_set.insert(matcher.value.clone());
1267 } else {
1268 return Err(ColumnNotFoundSnafu {
1269 col: matcher.value.clone(),
1270 }
1271 .build());
1272 }
1273 }
1274 MatchOp::NotEqual => {
1275 if col_set.contains(&matcher.value) {
1276 let _ = reverse_set.insert(matcher.value.clone());
1277 } else {
1278 return Err(ColumnNotFoundSnafu {
1279 col: matcher.value.clone(),
1280 }
1281 .build());
1282 }
1283 }
1284 MatchOp::Re(regex) => {
1285 for col in &self.ctx.field_columns {
1286 if regex.is_match(col) {
1287 let _ = result_set.insert(col.clone());
1288 }
1289 }
1290 }
1291 MatchOp::NotRe(regex) => {
1292 for col in &self.ctx.field_columns {
1293 if regex.is_match(col) {
1294 let _ = reverse_set.insert(col.clone());
1295 }
1296 }
1297 }
1298 }
1299 }
1300 if result_set.is_empty() {
1302 result_set = col_set.into_iter().cloned().collect();
1303 }
1304 for col in reverse_set {
1305 let _ = result_set.remove(&col);
1306 }
1307
1308 self.ctx.field_columns = self
1310 .ctx
1311 .field_columns
1312 .drain(..)
1313 .filter(|col| result_set.contains(col))
1314 .collect();
1315
1316 let exprs = result_set
1317 .into_iter()
1318 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1319 .chain(self.create_tag_column_exprs()?)
1320 .chain(
1321 self.ctx
1322 .use_tsid
1323 .then_some(DfExpr::Column(Column::new_unqualified(
1324 DATA_SCHEMA_TSID_COLUMN_NAME,
1325 ))),
1326 )
1327 .chain(Some(self.create_time_index_column_expr()?))
1328 .collect::<Vec<_>>();
1329
1330 table_scan = LogicalPlanBuilder::from(table_scan)
1332 .project(exprs)
1333 .context(DataFusionPlanningSnafu)?
1334 .build()
1335 .context(DataFusionPlanningSnafu)?;
1336 }
1337
1338 let series_key_columns = if self.ctx.use_tsid {
1340 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1341 } else {
1342 self.ctx.tag_columns.clone()
1343 };
1344
1345 let sort_exprs = if self.ctx.use_tsid {
1346 vec![
1347 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1348 self.create_time_index_column_expr()?.sort(true, true),
1349 ]
1350 } else {
1351 self.create_tag_and_time_index_column_sort_exprs()?
1352 };
1353
1354 let sort_plan = LogicalPlanBuilder::from(table_scan)
1355 .sort(sort_exprs)
1356 .context(DataFusionPlanningSnafu)?
1357 .build()
1358 .context(DataFusionPlanningSnafu)?;
1359
1360 let time_index_column =
1362 self.ctx
1363 .time_index_column
1364 .clone()
1365 .with_context(|| TimeIndexNotFoundSnafu {
1366 table: table_ref.to_string(),
1367 })?;
1368 let divide_plan = LogicalPlan::Extension(Extension {
1369 node: Arc::new(SeriesDivide::new(
1370 series_key_columns.clone(),
1371 time_index_column,
1372 sort_plan,
1373 )),
1374 });
1375
1376 if !is_range_selector && offset_duration == 0 {
1378 return Ok(divide_plan);
1379 }
1380 let series_normalize = SeriesNormalize::new(
1381 offset_duration,
1382 self.ctx
1383 .time_index_column
1384 .clone()
1385 .with_context(|| TimeIndexNotFoundSnafu {
1386 table: table_ref.to_quoted_string(),
1387 })?,
1388 is_range_selector,
1389 series_key_columns,
1390 divide_plan,
1391 );
1392 let logical_plan = LogicalPlan::Extension(Extension {
1393 node: Arc::new(series_normalize),
1394 });
1395
1396 Ok(logical_plan)
1397 }
1398
1399 fn agg_modifier_to_col(
1406 &mut self,
1407 input_schema: &DFSchemaRef,
1408 modifier: &Option<LabelModifier>,
1409 update_ctx: bool,
1410 ) -> Result<Vec<DfExpr>> {
1411 match modifier {
1412 None => {
1413 if update_ctx {
1414 self.ctx.tag_columns.clear();
1415 }
1416 Ok(vec![self.create_time_index_column_expr()?])
1417 }
1418 Some(LabelModifier::Include(labels)) => {
1419 if update_ctx {
1420 self.ctx.tag_columns.clear();
1421 }
1422 let mut exprs = Vec::with_capacity(labels.labels.len());
1423 for label in &labels.labels {
1424 if is_metric_engine_internal_column(label) {
1425 continue;
1426 }
1427 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1429 {
1430 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1431
1432 if update_ctx {
1433 self.ctx.tag_columns.push(column_name);
1435 }
1436 }
1437 }
1438 exprs.push(self.create_time_index_column_expr()?);
1440
1441 Ok(exprs)
1442 }
1443 Some(LabelModifier::Exclude(labels)) => {
1444 let mut all_fields = input_schema
1445 .fields()
1446 .iter()
1447 .map(|f| f.name())
1448 .collect::<BTreeSet<_>>();
1449
1450 all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1453
1454 for label in &labels.labels {
1457 let _ = all_fields.remove(label);
1458 }
1459
1460 if let Some(time_index) = &self.ctx.time_index_column {
1462 let _ = all_fields.remove(time_index);
1463 }
1464 for value in &self.ctx.field_columns {
1465 let _ = all_fields.remove(value);
1466 }
1467
1468 if update_ctx {
1469 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1471 }
1472
1473 let mut exprs = all_fields
1475 .into_iter()
1476 .map(|c| DfExpr::Column(Column::from(c)))
1477 .collect::<Vec<_>>();
1478
1479 exprs.push(self.create_time_index_column_expr()?);
1481
1482 Ok(exprs)
1483 }
1484 }
1485 }
1486
1487 pub fn matchers_to_expr(
1489 label_matchers: Matchers,
1490 table_schema: &DFSchemaRef,
1491 ) -> Result<Vec<DfExpr>> {
1492 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1493 for matcher in label_matchers.matchers {
1494 if matcher.name == SCHEMA_COLUMN_MATCHER
1495 || matcher.name == DB_COLUMN_MATCHER
1496 || matcher.name == FIELD_COLUMN_MATCHER
1497 {
1498 continue;
1499 }
1500
1501 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1502 let col = if let Some(column_name) = column_name {
1503 DfExpr::Column(Column::from_name(column_name))
1504 } else {
1505 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1506 .alias(matcher.name.clone())
1507 };
1508 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1509 let expr = match matcher.op {
1510 MatchOp::Equal => col.eq(lit),
1511 MatchOp::NotEqual => col.not_eq(lit),
1512 MatchOp::Re(re) => {
1513 if re.as_str() == "^(?:.*)$" {
1519 continue;
1520 }
1521 if re.as_str() == "^(?:.+)$" {
1522 col.not_eq(DfExpr::Literal(
1523 ScalarValue::Utf8(Some(String::new())),
1524 None,
1525 ))
1526 } else {
1527 DfExpr::BinaryExpr(BinaryExpr {
1528 left: Box::new(col),
1529 op: Operator::RegexMatch,
1530 right: Box::new(DfExpr::Literal(
1531 ScalarValue::Utf8(Some(re.as_str().to_string())),
1532 None,
1533 )),
1534 })
1535 }
1536 }
1537 MatchOp::NotRe(re) => {
1538 if re.as_str() == "^(?:.*)$" {
1539 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1540 } else if re.as_str() == "^(?:.+)$" {
1541 col.eq(DfExpr::Literal(
1542 ScalarValue::Utf8(Some(String::new())),
1543 None,
1544 ))
1545 } else {
1546 DfExpr::BinaryExpr(BinaryExpr {
1547 left: Box::new(col),
1548 op: Operator::RegexNotMatch,
1549 right: Box::new(DfExpr::Literal(
1550 ScalarValue::Utf8(Some(re.as_str().to_string())),
1551 None,
1552 )),
1553 })
1554 }
1555 }
1556 };
1557 exprs.push(expr);
1558 }
1559
1560 Ok(exprs)
1561 }
1562
1563 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1564 if is_metric_engine_internal_column(column) {
1565 return None;
1566 }
1567 schema
1568 .fields()
1569 .iter()
1570 .find(|field| field.name() == column)
1571 .map(|field| field.name().clone())
1572 }
1573
1574 fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1575 Ok(source
1576 .as_any()
1577 .downcast_ref::<DefaultTableSource>()
1578 .context(UnknownTableSnafu)?
1579 .table_provider
1580 .as_any()
1581 .downcast_ref::<DfTableProviderAdapter>()
1582 .context(UnknownTableSnafu)?
1583 .table())
1584 }
1585
1586 fn table_ref(&self) -> Result<TableReference> {
1587 let table_name = self
1588 .ctx
1589 .table_name
1590 .clone()
1591 .context(TableNameNotFoundSnafu)?;
1592
1593 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1595 TableReference::partial(schema_name.as_str(), table_name.as_str())
1596 } else {
1597 TableReference::bare(table_name.as_str())
1598 };
1599
1600 Ok(table_ref)
1601 }
1602
1603 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1604 let start = self.ctx.start;
1605 let end = self.ctx.end;
1606 if end < start {
1607 return InvalidTimeRangeSnafu { start, end }.fail();
1608 }
1609 let lookback_delta = self.ctx.lookback_delta;
1610 let range = self.ctx.range.unwrap_or_default();
1611 let interval = self.ctx.interval;
1612 let time_index_expr = self.create_time_index_column_expr()?;
1613 let num_points = (end - start) / interval;
1614
1615 let selector_window = if range == 0 { lookback_delta } else { range };
1623 let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1624
1625 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1627 let single_time_range = time_index_expr
1628 .clone()
1629 .gt_eq(DfExpr::Literal(
1630 ScalarValue::TimestampMillisecond(
1631 Some(
1632 self.ctx.start - offset_duration - selector_window
1633 + lower_exclusive_adjustment,
1634 ),
1635 None,
1636 ),
1637 None,
1638 ))
1639 .and(time_index_expr.lt_eq(DfExpr::Literal(
1640 ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1641 None,
1642 )));
1643 return Ok(Some(single_time_range));
1644 }
1645
1646 let mut filters = Vec::with_capacity(num_points as usize + 1);
1648 for timestamp in (start..=end).step_by(interval as usize) {
1649 filters.push(
1650 time_index_expr
1651 .clone()
1652 .gt_eq(DfExpr::Literal(
1653 ScalarValue::TimestampMillisecond(
1654 Some(
1655 timestamp - offset_duration - selector_window
1656 + lower_exclusive_adjustment,
1657 ),
1658 None,
1659 ),
1660 None,
1661 ))
1662 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1663 ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1664 None,
1665 ))),
1666 )
1667 }
1668
1669 Ok(filters.into_iter().reduce(DfExpr::or))
1670 }
1671
1672 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1677 let provider = self
1678 .table_provider
1679 .resolve_table(table_ref.clone())
1680 .await
1681 .context(CatalogSnafu)?;
1682
1683 let logical_table = self.table_from_source(&provider)?;
1684
1685 let mut maybe_phy_table_ref = table_ref.clone();
1687 let mut scan_provider = provider;
1688 let mut table_id_filter: Option<u32> = None;
1689
1690 if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1693 && let Some(physical_table_name) = logical_table
1694 .table_info()
1695 .meta
1696 .options
1697 .extra_options
1698 .get(LOGICAL_TABLE_METADATA_KEY)
1699 {
1700 let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1701 TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1702 } else {
1703 TableReference::bare(physical_table_name.as_str())
1704 };
1705
1706 let physical_provider = match self
1707 .table_provider
1708 .resolve_table(physical_table_ref.clone())
1709 .await
1710 {
1711 Ok(provider) => provider,
1712 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1713 scan_provider.clone()
1716 }
1717 Err(e) => return Err(e).context(CatalogSnafu),
1718 };
1719
1720 if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1721 let physical_table = self.table_from_source(&physical_provider)?;
1723
1724 let has_table_id = physical_table
1725 .schema()
1726 .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1727 .is_some();
1728 let has_tsid = physical_table
1729 .schema()
1730 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1731 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1732
1733 if has_table_id && has_tsid {
1734 scan_provider = physical_provider;
1735 maybe_phy_table_ref = physical_table_ref;
1736 table_id_filter = Some(logical_table.table_info().ident.table_id);
1737 }
1738 }
1739 }
1740
1741 let scan_table = self.table_from_source(&scan_provider)?;
1742
1743 let use_tsid = table_id_filter.is_some()
1744 && scan_table
1745 .schema()
1746 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1747 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1748 self.ctx.use_tsid = use_tsid;
1749
1750 let all_table_tags = self.ctx.tag_columns.clone();
1751
1752 let scan_tag_columns = if use_tsid {
1753 let mut scan_tags = self.ctx.tag_columns.clone();
1754 for matcher in &self.ctx.selector_matcher {
1755 if is_metric_engine_internal_column(&matcher.name) {
1756 continue;
1757 }
1758 if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1759 scan_tags.push(matcher.name.clone());
1760 }
1761 }
1762 scan_tags.sort_unstable();
1763 scan_tags.dedup();
1764 scan_tags
1765 } else {
1766 self.ctx.tag_columns.clone()
1767 };
1768
1769 let is_time_index_ms = scan_table
1770 .schema()
1771 .timestamp_column()
1772 .with_context(|| TimeIndexNotFoundSnafu {
1773 table: maybe_phy_table_ref.to_quoted_string(),
1774 })?
1775 .data_type
1776 == ConcreteDataType::timestamp_millisecond_datatype();
1777
1778 let scan_projection = if table_id_filter.is_some() {
1779 let mut required_columns = HashSet::new();
1780 required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1781 required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1782 TimeIndexNotFoundSnafu {
1783 table: maybe_phy_table_ref.to_quoted_string(),
1784 }
1785 })?);
1786 for col in &scan_tag_columns {
1787 required_columns.insert(col.clone());
1788 }
1789 for col in &self.ctx.field_columns {
1790 required_columns.insert(col.clone());
1791 }
1792 if use_tsid {
1793 required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1794 }
1795
1796 let arrow_schema = scan_table.schema().arrow_schema().clone();
1797 Some(
1798 arrow_schema
1799 .fields()
1800 .iter()
1801 .enumerate()
1802 .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1803 .map(|(idx, _)| idx)
1804 .collect::<Vec<_>>(),
1805 )
1806 } else {
1807 None
1808 };
1809
1810 let mut scan_plan =
1811 LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1812 .context(DataFusionPlanningSnafu)?
1813 .build()
1814 .context(DataFusionPlanningSnafu)?;
1815
1816 if let Some(table_id) = table_id_filter {
1817 scan_plan = LogicalPlanBuilder::from(scan_plan)
1818 .filter(
1819 DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1820 .eq(lit(table_id)),
1821 )
1822 .context(DataFusionPlanningSnafu)?
1823 .alias(table_ref.clone()) .context(DataFusionPlanningSnafu)?
1825 .build()
1826 .context(DataFusionPlanningSnafu)?;
1827 }
1828
1829 if !is_time_index_ms {
1830 let expr: Vec<_> = self
1832 .create_field_column_exprs()?
1833 .into_iter()
1834 .chain(
1835 scan_tag_columns
1836 .iter()
1837 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1838 )
1839 .chain(self.ctx.use_tsid.then_some(DfExpr::Column(Column::new(
1840 Some(table_ref.clone()),
1841 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1842 ))))
1843 .chain(Some(DfExpr::Alias(Alias {
1844 expr: Box::new(DfExpr::Cast(Cast {
1845 expr: Box::new(self.create_time_index_column_expr()?),
1846 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1847 })),
1848 relation: Some(table_ref.clone()),
1849 name: self
1850 .ctx
1851 .time_index_column
1852 .as_ref()
1853 .with_context(|| TimeIndexNotFoundSnafu {
1854 table: table_ref.to_quoted_string(),
1855 })?
1856 .clone(),
1857 metadata: None,
1858 })))
1859 .collect::<Vec<_>>();
1860 scan_plan = LogicalPlanBuilder::from(scan_plan)
1861 .project(expr)
1862 .context(DataFusionPlanningSnafu)?
1863 .build()
1864 .context(DataFusionPlanningSnafu)?;
1865 } else if table_id_filter.is_some() {
1866 let project_exprs = self
1868 .create_field_column_exprs()?
1869 .into_iter()
1870 .chain(
1871 scan_tag_columns
1872 .iter()
1873 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1874 )
1875 .chain(
1876 self.ctx
1877 .use_tsid
1878 .then_some(DfExpr::Column(Column::from_name(
1879 DATA_SCHEMA_TSID_COLUMN_NAME,
1880 ))),
1881 )
1882 .chain(Some(self.create_time_index_column_expr()?))
1883 .collect::<Vec<_>>();
1884
1885 scan_plan = LogicalPlanBuilder::from(scan_plan)
1886 .project(project_exprs)
1887 .context(DataFusionPlanningSnafu)?
1888 .build()
1889 .context(DataFusionPlanningSnafu)?;
1890 }
1891
1892 let result = LogicalPlanBuilder::from(scan_plan)
1893 .build()
1894 .context(DataFusionPlanningSnafu)?;
1895 Ok(result)
1896 }
1897
1898 fn collect_row_key_tag_columns_from_plan(
1899 &self,
1900 plan: &LogicalPlan,
1901 ) -> Result<BTreeSet<String>> {
1902 fn walk(
1903 planner: &PromPlanner,
1904 plan: &LogicalPlan,
1905 out: &mut BTreeSet<String>,
1906 ) -> Result<()> {
1907 if let LogicalPlan::TableScan(scan) = plan {
1908 let table = planner.table_from_source(&scan.source)?;
1909 for col in table.table_info().meta.row_key_column_names() {
1910 if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1911 && col != DATA_SCHEMA_TSID_COLUMN_NAME
1912 && !is_metric_engine_internal_column(col)
1913 {
1914 out.insert(col.clone());
1915 }
1916 }
1917 }
1918
1919 for input in plan.inputs() {
1920 walk(planner, input, out)?;
1921 }
1922 Ok(())
1923 }
1924
1925 let mut out = BTreeSet::new();
1926 walk(self, plan, &mut out)?;
1927 Ok(out)
1928 }
1929
1930 fn ensure_tag_columns_available(
1931 &self,
1932 plan: LogicalPlan,
1933 required_tags: &BTreeSet<String>,
1934 ) -> Result<LogicalPlan> {
1935 if required_tags.is_empty() {
1936 return Ok(plan);
1937 }
1938
1939 struct Rewriter {
1940 required_tags: BTreeSet<String>,
1941 }
1942
1943 impl TreeNodeRewriter for Rewriter {
1944 type Node = LogicalPlan;
1945
1946 fn f_up(
1947 &mut self,
1948 node: Self::Node,
1949 ) -> datafusion_common::Result<Transformed<Self::Node>> {
1950 match node {
1951 LogicalPlan::TableScan(scan) => {
1952 let schema = scan.source.schema();
1953 let mut projection = match scan.projection.clone() {
1954 Some(p) => p,
1955 None => {
1956 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1958 }
1959 };
1960
1961 let mut changed = false;
1962 for tag in &self.required_tags {
1963 if let Some((idx, _)) = schema
1964 .fields()
1965 .iter()
1966 .enumerate()
1967 .find(|(_, field)| field.name() == tag)
1968 && !projection.contains(&idx)
1969 {
1970 projection.push(idx);
1971 changed = true;
1972 }
1973 }
1974
1975 if !changed {
1976 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1977 }
1978
1979 projection.sort_unstable();
1980 projection.dedup();
1981
1982 let new_scan = TableScan::try_new(
1983 scan.table_name.clone(),
1984 scan.source.clone(),
1985 Some(projection),
1986 scan.filters,
1987 scan.fetch,
1988 )?;
1989 Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1990 }
1991 LogicalPlan::Projection(proj) => {
1992 let input_schema = proj.input.schema();
1993
1994 let existing = proj
1995 .schema
1996 .fields()
1997 .iter()
1998 .map(|f| f.name().as_str())
1999 .collect::<HashSet<_>>();
2000
2001 let mut expr = proj.expr.clone();
2002 let mut has_changed = false;
2003 for tag in &self.required_tags {
2004 if existing.contains(tag.as_str()) {
2005 continue;
2006 }
2007
2008 if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
2009 expr.push(DfExpr::Column(Column::from(
2010 input_schema.qualified_field(idx),
2011 )));
2012 has_changed = true;
2013 }
2014 }
2015
2016 if !has_changed {
2017 return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2018 }
2019
2020 let new_proj = Projection::try_new(expr, proj.input)?;
2021 Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2022 }
2023 other => Ok(Transformed::no(other)),
2024 }
2025 }
2026 }
2027
2028 let mut rewriter = Rewriter {
2029 required_tags: required_tags.clone(),
2030 };
2031 let rewritten = plan
2032 .rewrite(&mut rewriter)
2033 .context(DataFusionPlanningSnafu)?;
2034 Ok(rewritten.data)
2035 }
2036
2037 fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2038 let time_index = self.ctx.time_index_column.as_deref();
2039 let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2040
2041 let mut tags = schema
2042 .fields()
2043 .iter()
2044 .map(|f| f.name())
2045 .filter(|name| Some(name.as_str()) != time_index)
2046 .filter(|name| !field_columns.contains(name))
2047 .filter(|name| !is_metric_engine_internal_column(name))
2048 .cloned()
2049 .collect::<Vec<_>>();
2050 tags.sort_unstable();
2051 tags.dedup();
2052 self.ctx.tag_columns = tags;
2053 }
2054
2055 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2059 let table_ref = self.table_ref()?;
2060 let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2061 Err(e) if e.status_code() == StatusCode::TableNotFound => {
2062 let plan = self.setup_context_for_empty_metric()?;
2063 return Ok(Some(plan));
2064 }
2065 res => res.context(CatalogSnafu)?,
2066 };
2067 let table = self.table_from_source(&source)?;
2068
2069 let time_index = table
2071 .schema()
2072 .timestamp_column()
2073 .with_context(|| TimeIndexNotFoundSnafu {
2074 table: table_ref.to_quoted_string(),
2075 })?
2076 .name
2077 .clone();
2078 self.ctx.time_index_column = Some(time_index);
2079
2080 let values = table
2082 .table_info()
2083 .meta
2084 .field_column_names()
2085 .cloned()
2086 .collect();
2087 self.ctx.field_columns = values;
2088
2089 let tags = table
2091 .table_info()
2092 .meta
2093 .row_key_column_names()
2094 .filter(|col| {
2095 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2097 })
2098 .cloned()
2099 .collect();
2100 self.ctx.tag_columns = tags;
2101
2102 self.ctx.use_tsid = false;
2103
2104 Ok(None)
2105 }
2106
2107 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2110 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2111 self.ctx.reset_table_name_and_schema();
2112 self.ctx.tag_columns = vec![];
2113 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2114 self.ctx.use_tsid = false;
2115
2116 let plan = LogicalPlan::Extension(Extension {
2118 node: Arc::new(
2119 EmptyMetric::new(
2120 0,
2121 -1,
2122 self.ctx.interval,
2123 SPECIAL_TIME_FUNCTION.to_string(),
2124 DEFAULT_FIELD_COLUMN.to_string(),
2125 Some(lit(0.0f64)),
2126 )
2127 .context(DataFusionPlanningSnafu)?,
2128 ),
2129 });
2130 Ok(plan)
2131 }
2132
2133 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2135 let mut result = FunctionArgs::default();
2136
2137 for arg in args {
2138 if let Some(expr) = Self::try_build_literal_expr(arg) {
2140 result.literals.push(expr);
2141 } else {
2142 match arg.as_ref() {
2144 PromExpr::Subquery(_)
2145 | PromExpr::VectorSelector(_)
2146 | PromExpr::MatrixSelector(_)
2147 | PromExpr::Extension(_)
2148 | PromExpr::Aggregate(_)
2149 | PromExpr::Paren(_)
2150 | PromExpr::Call(_)
2151 | PromExpr::Binary(_)
2152 | PromExpr::Unary(_) => {
2153 if result.input.replace(*arg.clone()).is_some() {
2154 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2155 }
2156 }
2157
2158 _ => {
2159 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2160 result.literals.push(expr);
2161 }
2162 }
2163 }
2164 }
2165
2166 Ok(result)
2167 }
2168
2169 fn create_function_expr(
2175 &mut self,
2176 func: &Function,
2177 other_input_exprs: Vec<DfExpr>,
2178 query_engine_state: &QueryEngineState,
2179 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2180 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2182
2183 let field_column_pos = 0;
2185 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2186 let mut new_tags = vec![];
2188 let scalar_func = match func.name {
2189 "increase" => ScalarFunc::ExtrapolateUdf(
2190 Arc::new(Increase::scalar_udf()),
2191 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2192 ),
2193 "rate" => ScalarFunc::ExtrapolateUdf(
2194 Arc::new(Rate::scalar_udf()),
2195 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2196 ),
2197 "delta" => ScalarFunc::ExtrapolateUdf(
2198 Arc::new(Delta::scalar_udf()),
2199 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2200 ),
2201 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2202 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2203 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2204 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2205 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2206 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2207 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2208 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2209 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2210 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2211 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2212 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2213 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2214 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2215 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2216 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2217 "predict_linear" => {
2218 other_input_exprs[0] = DfExpr::Cast(Cast {
2219 expr: Box::new(other_input_exprs[0].clone()),
2220 data_type: ArrowDataType::Int64,
2221 });
2222 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2223 }
2224 "double_exponential_smoothing" | "holt_winters" => {
2225 ScalarFunc::Udf(Arc::new(DoubleExponentialSmoothing::scalar_udf()))
2226 }
2227 "time" => {
2228 exprs.push(build_special_time_expr(
2229 self.ctx.time_index_column.as_ref().unwrap(),
2230 ));
2231 ScalarFunc::GeneratedExpr
2232 }
2233 "minute" => {
2234 let expr = self.date_part_on_time_index("minute")?;
2236 exprs.push(expr);
2237 ScalarFunc::GeneratedExpr
2238 }
2239 "hour" => {
2240 let expr = self.date_part_on_time_index("hour")?;
2242 exprs.push(expr);
2243 ScalarFunc::GeneratedExpr
2244 }
2245 "month" => {
2246 let expr = self.date_part_on_time_index("month")?;
2248 exprs.push(expr);
2249 ScalarFunc::GeneratedExpr
2250 }
2251 "year" => {
2252 let expr = self.date_part_on_time_index("year")?;
2254 exprs.push(expr);
2255 ScalarFunc::GeneratedExpr
2256 }
2257 "day_of_month" => {
2258 let expr = self.date_part_on_time_index("day")?;
2260 exprs.push(expr);
2261 ScalarFunc::GeneratedExpr
2262 }
2263 "day_of_week" => {
2264 let expr = self.date_part_on_time_index("dow")?;
2266 exprs.push(expr);
2267 ScalarFunc::GeneratedExpr
2268 }
2269 "day_of_year" => {
2270 let expr = self.date_part_on_time_index("doy")?;
2272 exprs.push(expr);
2273 ScalarFunc::GeneratedExpr
2274 }
2275 "days_in_month" => {
2276 let day_lit_expr = "day".lit();
2281 let month_lit_expr = "month".lit();
2282 let interval_1month_lit_expr =
2283 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2284 let interval_1day_lit_expr = DfExpr::Literal(
2285 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2286 None,
2287 );
2288 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2289 left: Box::new(interval_1month_lit_expr),
2290 op: Operator::Minus,
2291 right: Box::new(interval_1day_lit_expr),
2292 });
2293 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2294 func: datafusion_functions::datetime::date_trunc(),
2295 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2296 });
2297 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2298 left: Box::new(date_trunc_expr),
2299 op: Operator::Plus,
2300 right: Box::new(the_1month_minus_1day_expr),
2301 });
2302 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2303 func: datafusion_functions::datetime::date_part(),
2304 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2305 });
2306
2307 exprs.push(date_part_expr);
2308 ScalarFunc::GeneratedExpr
2309 }
2310
2311 "label_join" => {
2312 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2313 &mut other_input_exprs,
2314 &self.ctx,
2315 query_engine_state,
2316 )?;
2317
2318 for value in &self.ctx.field_columns {
2320 if *value != dst_label {
2321 let expr = DfExpr::Column(Column::from_name(value));
2322 exprs.push(expr);
2323 }
2324 }
2325
2326 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2328 new_tags.push(dst_label);
2329 exprs.push(concat_expr);
2331
2332 ScalarFunc::GeneratedExpr
2333 }
2334 "label_replace" => {
2335 if let Some((replace_expr, dst_label)) = self
2336 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2337 {
2338 for value in &self.ctx.field_columns {
2340 if *value != dst_label {
2341 let expr = DfExpr::Column(Column::from_name(value));
2342 exprs.push(expr);
2343 }
2344 }
2345
2346 ensure!(
2347 !self.ctx.tag_columns.contains(&dst_label),
2348 SameLabelSetSnafu
2349 );
2350 new_tags.push(dst_label);
2351 exprs.push(replace_expr);
2353 } else {
2354 for value in &self.ctx.field_columns {
2356 let expr = DfExpr::Column(Column::from_name(value));
2357 exprs.push(expr);
2358 }
2359 }
2360
2361 ScalarFunc::GeneratedExpr
2362 }
2363 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2364 for value in &self.ctx.field_columns {
2367 let expr = DfExpr::Column(Column::from_name(value));
2368 exprs.push(expr);
2369 }
2370
2371 ScalarFunc::GeneratedExpr
2372 }
2373 "round" => {
2374 if other_input_exprs.is_empty() {
2375 other_input_exprs.push_front(0.0f64.lit());
2376 }
2377 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2378 }
2379 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2380 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2381 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2382 "pi" => {
2383 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2385 func: datafusion::functions::math::pi(),
2386 args: vec![],
2387 });
2388 exprs.push(fn_expr);
2389
2390 ScalarFunc::GeneratedExpr
2391 }
2392 _ => {
2393 if let Some(f) = query_engine_state
2394 .session_state()
2395 .scalar_functions()
2396 .get(func.name)
2397 {
2398 ScalarFunc::DataFusionBuiltin(f.clone())
2399 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2400 let func_state = query_engine_state.function_state();
2401 let query_ctx = self.table_provider.query_ctx();
2402
2403 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2404 state: func_state,
2405 query_ctx: query_ctx.clone(),
2406 })))
2407 } else if let Some(f) = datafusion_functions::math::functions()
2408 .iter()
2409 .find(|f| f.name() == func.name)
2410 {
2411 ScalarFunc::DataFusionUdf(f.clone())
2412 } else {
2413 return UnsupportedExprSnafu {
2414 name: func.name.to_string(),
2415 }
2416 .fail();
2417 }
2418 }
2419 };
2420
2421 for value in &self.ctx.field_columns {
2422 let col_expr = DfExpr::Column(Column::from_name(value));
2423
2424 match scalar_func.clone() {
2425 ScalarFunc::DataFusionBuiltin(func) => {
2426 other_input_exprs.insert(field_column_pos, col_expr);
2427 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2428 func,
2429 args: other_input_exprs.clone().into(),
2430 });
2431 exprs.push(fn_expr);
2432 let _ = other_input_exprs.remove(field_column_pos);
2433 }
2434 ScalarFunc::DataFusionUdf(func) => {
2435 let args = itertools::chain!(
2436 other_input_exprs.iter().take(field_column_pos).cloned(),
2437 std::iter::once(col_expr),
2438 other_input_exprs.iter().skip(field_column_pos).cloned()
2439 )
2440 .collect_vec();
2441 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2442 }
2443 ScalarFunc::Udf(func) => {
2444 let ts_range_expr = DfExpr::Column(Column::from_name(
2445 RangeManipulate::build_timestamp_range_name(
2446 self.ctx.time_index_column.as_ref().unwrap(),
2447 ),
2448 ));
2449 other_input_exprs.insert(field_column_pos, ts_range_expr);
2450 other_input_exprs.insert(field_column_pos + 1, col_expr);
2451 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2452 func,
2453 args: other_input_exprs.clone().into(),
2454 });
2455 exprs.push(fn_expr);
2456 let _ = other_input_exprs.remove(field_column_pos + 1);
2457 let _ = other_input_exprs.remove(field_column_pos);
2458 }
2459 ScalarFunc::ExtrapolateUdf(func, range_length) => {
2460 let ts_range_expr = DfExpr::Column(Column::from_name(
2461 RangeManipulate::build_timestamp_range_name(
2462 self.ctx.time_index_column.as_ref().unwrap(),
2463 ),
2464 ));
2465 other_input_exprs.insert(field_column_pos, ts_range_expr);
2466 other_input_exprs.insert(field_column_pos + 1, col_expr);
2467 other_input_exprs
2468 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2469 other_input_exprs.push_back(lit(range_length));
2470 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2471 func,
2472 args: other_input_exprs.clone().into(),
2473 });
2474 exprs.push(fn_expr);
2475 let _ = other_input_exprs.pop_back();
2476 let _ = other_input_exprs.remove(field_column_pos + 2);
2477 let _ = other_input_exprs.remove(field_column_pos + 1);
2478 let _ = other_input_exprs.remove(field_column_pos);
2479 }
2480 ScalarFunc::GeneratedExpr => {}
2481 }
2482 }
2483
2484 if !matches!(func.name, "label_join" | "label_replace") {
2488 let mut new_field_columns = Vec::with_capacity(exprs.len());
2489
2490 exprs = exprs
2491 .into_iter()
2492 .map(|expr| {
2493 let display_name = expr.schema_name().to_string();
2494 new_field_columns.push(display_name.clone());
2495 Ok(expr.alias(display_name))
2496 })
2497 .collect::<std::result::Result<Vec<_>, _>>()
2498 .context(DataFusionPlanningSnafu)?;
2499
2500 self.ctx.field_columns = new_field_columns;
2501 }
2502
2503 Ok((exprs, new_tags))
2504 }
2505
2506 fn validate_label_name(label_name: &str) -> Result<()> {
2510 if label_name.starts_with("__") {
2512 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2513 }
2514 if !LABEL_NAME_REGEX.is_match(label_name) {
2516 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2517 }
2518
2519 Ok(())
2520 }
2521
2522 fn build_regexp_replace_label_expr(
2524 &self,
2525 other_input_exprs: &mut VecDeque<DfExpr>,
2526 query_engine_state: &QueryEngineState,
2527 ) -> Result<Option<(DfExpr, String)>> {
2528 let dst_label = match other_input_exprs.pop_front() {
2530 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2531 other => UnexpectedPlanExprSnafu {
2532 desc: format!("expected dst_label string literal, but found {:?}", other),
2533 }
2534 .fail()?,
2535 };
2536
2537 Self::validate_label_name(&dst_label)?;
2539 let replacement = match other_input_exprs.pop_front() {
2540 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2541 other => UnexpectedPlanExprSnafu {
2542 desc: format!("expected replacement string literal, but found {:?}", other),
2543 }
2544 .fail()?,
2545 };
2546 let src_label = match other_input_exprs.pop_front() {
2547 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2548 other => UnexpectedPlanExprSnafu {
2549 desc: format!("expected src_label string literal, but found {:?}", other),
2550 }
2551 .fail()?,
2552 };
2553
2554 let regex = match other_input_exprs.pop_front() {
2555 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2556 other => UnexpectedPlanExprSnafu {
2557 desc: format!("expected regex string literal, but found {:?}", other),
2558 }
2559 .fail()?,
2560 };
2561
2562 regex::Regex::new(®ex).map_err(|_| {
2565 InvalidRegularExpressionSnafu {
2566 regex: regex.clone(),
2567 }
2568 .build()
2569 })?;
2570
2571 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2573 return Ok(None);
2574 }
2575
2576 if !self.ctx.tag_columns.contains(&src_label) {
2578 if replacement.is_empty() {
2579 return Ok(None);
2581 } else {
2582 return Ok(Some((
2584 lit(replacement).alias(&dst_label),
2586 dst_label,
2587 )));
2588 }
2589 }
2590
2591 let regex = format!("^(?s:{regex})$");
2594
2595 let session_state = query_engine_state.session_state();
2596 let func = session_state
2597 .scalar_functions()
2598 .get("regexp_replace")
2599 .context(UnsupportedExprSnafu {
2600 name: "regexp_replace",
2601 })?;
2602
2603 let args = vec![
2605 if src_label.is_empty() {
2606 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2607 } else {
2608 DfExpr::Column(Column::from_name(src_label))
2609 },
2610 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2611 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2612 ];
2613
2614 Ok(Some((
2615 DfExpr::ScalarFunction(ScalarFunction {
2616 func: func.clone(),
2617 args,
2618 })
2619 .alias(&dst_label),
2620 dst_label,
2621 )))
2622 }
2623
2624 fn build_concat_labels_expr(
2626 other_input_exprs: &mut VecDeque<DfExpr>,
2627 ctx: &PromPlannerContext,
2628 query_engine_state: &QueryEngineState,
2629 ) -> Result<(DfExpr, String)> {
2630 let dst_label = match other_input_exprs.pop_front() {
2633 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2634 other => UnexpectedPlanExprSnafu {
2635 desc: format!("expected dst_label string literal, but found {:?}", other),
2636 }
2637 .fail()?,
2638 };
2639 let separator = match other_input_exprs.pop_front() {
2640 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2641 other => UnexpectedPlanExprSnafu {
2642 desc: format!("expected separator string literal, but found {:?}", other),
2643 }
2644 .fail()?,
2645 };
2646
2647 let available_columns: HashSet<&str> = ctx
2649 .tag_columns
2650 .iter()
2651 .chain(ctx.field_columns.iter())
2652 .chain(ctx.time_index_column.as_ref())
2653 .map(|s| s.as_str())
2654 .collect();
2655
2656 let src_labels = other_input_exprs
2657 .iter()
2658 .map(|expr| {
2659 match expr {
2661 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2662 if label.is_empty() {
2663 Ok(DfExpr::Literal(ScalarValue::Null, None))
2664 } else if available_columns.contains(label.as_str()) {
2665 Ok(DfExpr::Column(Column::from_name(label)))
2667 } else {
2668 Ok(DfExpr::Literal(ScalarValue::Null, None))
2670 }
2671 }
2672 other => UnexpectedPlanExprSnafu {
2673 desc: format!(
2674 "expected source label string literal, but found {:?}",
2675 other
2676 ),
2677 }
2678 .fail(),
2679 }
2680 })
2681 .collect::<Result<Vec<_>>>()?;
2682 ensure!(
2683 !src_labels.is_empty(),
2684 FunctionInvalidArgumentSnafu {
2685 fn_name: "label_join"
2686 }
2687 );
2688
2689 let session_state = query_engine_state.session_state();
2690 let func = session_state
2691 .scalar_functions()
2692 .get("concat_ws")
2693 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2694
2695 let mut args = Vec::with_capacity(1 + src_labels.len());
2697 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2698 args.extend(src_labels);
2699
2700 Ok((
2701 DfExpr::ScalarFunction(ScalarFunction {
2702 func: func.clone(),
2703 args,
2704 })
2705 .alias(&dst_label),
2706 dst_label,
2707 ))
2708 }
2709
2710 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2711 Ok(DfExpr::Column(Column::from_name(
2712 self.ctx
2713 .time_index_column
2714 .clone()
2715 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2716 )))
2717 }
2718
2719 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2720 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2721 for tag in &self.ctx.tag_columns {
2722 let expr = DfExpr::Column(Column::from_name(tag));
2723 result.push(expr);
2724 }
2725 Ok(result)
2726 }
2727
2728 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2729 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2730 for field in &self.ctx.field_columns {
2731 let expr = DfExpr::Column(Column::from_name(field));
2732 result.push(expr);
2733 }
2734 Ok(result)
2735 }
2736
2737 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2738 let mut result = self
2739 .ctx
2740 .tag_columns
2741 .iter()
2742 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2743 .collect::<Vec<_>>();
2744 result.push(self.create_time_index_column_expr()?.sort(true, true));
2745 Ok(result)
2746 }
2747
2748 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2749 self.ctx
2750 .field_columns
2751 .iter()
2752 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2753 .collect::<Vec<_>>()
2754 }
2755
2756 fn create_sort_exprs_by_tags(
2757 func: &str,
2758 tags: Vec<DfExpr>,
2759 asc: bool,
2760 ) -> Result<Vec<SortExpr>> {
2761 ensure!(
2762 !tags.is_empty(),
2763 FunctionInvalidArgumentSnafu { fn_name: func }
2764 );
2765
2766 tags.iter()
2767 .map(|col| match col {
2768 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2769 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2770 }
2771 other => UnexpectedPlanExprSnafu {
2772 desc: format!("expected label string literal, but found {:?}", other),
2773 }
2774 .fail(),
2775 })
2776 .collect::<Result<Vec<_>>>()
2777 }
2778
2779 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2780 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2781 for value in &self.ctx.field_columns {
2782 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2783 exprs.push(expr);
2784 }
2785
2786 conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2791 table: self
2792 .table_ref()
2793 .map(|t| t.to_quoted_string())
2794 .unwrap_or_else(|_| "unknown".to_string()),
2795 })
2796 }
2797
2798 fn create_aggregate_exprs(
2814 &mut self,
2815 op: TokenType,
2816 param: &Option<Box<PromExpr>>,
2817 input_plan: &LogicalPlan,
2818 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2819 let mut non_col_args = Vec::new();
2820 let is_group_agg = op.id() == token::T_GROUP;
2821 if is_group_agg {
2822 ensure!(
2823 self.ctx.field_columns.len() == 1,
2824 MultiFieldsNotSupportedSnafu {
2825 operator: "group()"
2826 }
2827 );
2828 }
2829 let aggr = match op.id() {
2830 token::T_SUM => sum_udaf(),
2831 token::T_QUANTILE => {
2832 let q =
2833 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2834 non_col_args.push(q);
2835 quantile_udaf()
2836 }
2837 token::T_AVG => avg_udaf(),
2838 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2839 token::T_MIN => min_udaf(),
2840 token::T_MAX => max_udaf(),
2841 token::T_GROUP => max_udaf(),
2844 token::T_STDDEV => stddev_pop_udaf(),
2845 token::T_STDVAR => var_pop_udaf(),
2846 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2847 name: format!("{op:?}"),
2848 }
2849 .fail()?,
2850 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2851 };
2852
2853 let exprs: Vec<DfExpr> = self
2855 .ctx
2856 .field_columns
2857 .iter()
2858 .map(|col| {
2859 if is_group_agg {
2860 aggr.call(vec![lit(1_f64)])
2861 } else {
2862 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2863 let expr = aggr.call(non_col_args.clone());
2864 non_col_args.pop();
2865 expr
2866 }
2867 })
2868 .collect::<Vec<_>>();
2869
2870 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2872 let prev_field_exprs: Vec<_> = self
2873 .ctx
2874 .field_columns
2875 .iter()
2876 .map(|col| DfExpr::Column(Column::from_name(col)))
2877 .collect();
2878
2879 ensure!(
2880 self.ctx.field_columns.len() == 1,
2881 UnsupportedExprSnafu {
2882 name: "count_values on multi-value input"
2883 }
2884 );
2885
2886 prev_field_exprs
2887 } else {
2888 vec![]
2889 };
2890
2891 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2893
2894 let normalized_exprs =
2895 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2896 for expr in normalized_exprs {
2897 new_field_columns.push(expr.schema_name().to_string());
2898 }
2899 self.ctx.field_columns = new_field_columns;
2900
2901 Ok((exprs, prev_field_exprs))
2902 }
2903
2904 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2905 let param = param
2906 .as_deref()
2907 .with_context(|| FunctionInvalidArgumentSnafu {
2908 fn_name: op.to_string(),
2909 })?;
2910 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2911 return FunctionInvalidArgumentSnafu {
2912 fn_name: op.to_string(),
2913 }
2914 .fail();
2915 };
2916
2917 Ok(val)
2918 }
2919
2920 fn get_param_as_literal_expr(
2921 param: &Option<Box<PromExpr>>,
2922 op: Option<TokenType>,
2923 expected_type: Option<ArrowDataType>,
2924 ) -> Result<DfExpr> {
2925 let prom_param = param.as_deref().with_context(|| {
2926 if let Some(op) = op {
2927 FunctionInvalidArgumentSnafu {
2928 fn_name: op.to_string(),
2929 }
2930 } else {
2931 FunctionInvalidArgumentSnafu {
2932 fn_name: "unknown".to_string(),
2933 }
2934 }
2935 })?;
2936
2937 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2938 if let Some(op) = op {
2939 FunctionInvalidArgumentSnafu {
2940 fn_name: op.to_string(),
2941 }
2942 } else {
2943 FunctionInvalidArgumentSnafu {
2944 fn_name: "unknown".to_string(),
2945 }
2946 }
2947 })?;
2948
2949 if let Some(expected_type) = expected_type {
2951 let expr_type = expr
2953 .get_type(&DFSchema::empty())
2954 .context(DataFusionPlanningSnafu)?;
2955 if expected_type != expr_type {
2956 return FunctionInvalidArgumentSnafu {
2957 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2958 }
2959 .fail();
2960 }
2961 }
2962
2963 Ok(expr)
2964 }
2965
2966 fn create_window_exprs(
2969 &mut self,
2970 op: TokenType,
2971 group_exprs: Vec<DfExpr>,
2972 input_plan: &LogicalPlan,
2973 ) -> Result<Vec<DfExpr>> {
2974 ensure!(
2975 self.ctx.field_columns.len() == 1,
2976 UnsupportedExprSnafu {
2977 name: "topk or bottomk on multi-value input"
2978 }
2979 );
2980
2981 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2982
2983 let asc = matches!(op.id(), token::T_BOTTOMK);
2984
2985 let tag_sort_exprs = self
2986 .create_tag_column_exprs()?
2987 .into_iter()
2988 .map(|expr| expr.sort(asc, true));
2989
2990 let exprs: Vec<DfExpr> = self
2992 .ctx
2993 .field_columns
2994 .iter()
2995 .map(|col| {
2996 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2997 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2999 sort_exprs.extend(tag_sort_exprs.clone());
3002
3003 DfExpr::WindowFunction(Box::new(WindowFunction {
3004 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
3005 params: WindowFunctionParams {
3006 args: vec![],
3007 partition_by: group_exprs.clone(),
3008 order_by: sort_exprs,
3009 window_frame: WindowFrame::new(Some(true)),
3010 null_treatment: None,
3011 distinct: false,
3012 filter: None,
3013 },
3014 }))
3015 })
3016 .collect();
3017
3018 let normalized_exprs =
3019 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3020 Ok(normalized_exprs)
3021 }
3022
3023 #[deprecated(
3025 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3026 )]
3027 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3028 match expr {
3029 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3030 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3031 PromExpr::Unary(UnaryExpr { expr, .. }) => {
3032 Self::try_build_float_literal(expr).map(|f| -f)
3033 }
3034 PromExpr::StringLiteral(_)
3035 | PromExpr::Binary(_)
3036 | PromExpr::VectorSelector(_)
3037 | PromExpr::MatrixSelector(_)
3038 | PromExpr::Call(_)
3039 | PromExpr::Extension(_)
3040 | PromExpr::Aggregate(_)
3041 | PromExpr::Subquery(_) => None,
3042 }
3043 }
3044
3045 async fn create_histogram_plan(
3047 &mut self,
3048 args: &PromFunctionArgs,
3049 query_engine_state: &QueryEngineState,
3050 ) -> Result<LogicalPlan> {
3051 if args.args.len() != 2 {
3052 return FunctionInvalidArgumentSnafu {
3053 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3054 }
3055 .fail();
3056 }
3057 #[allow(deprecated)]
3058 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3059 FunctionInvalidArgumentSnafu {
3060 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3061 }
3062 })?;
3063
3064 let input = args.args[1].as_ref().clone();
3065 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3066 let input_plan = self.strip_tsid_column(input_plan)?;
3070 self.ctx.use_tsid = false;
3071
3072 if !self.ctx.has_le_tag() {
3073 return Ok(LogicalPlan::EmptyRelation(
3076 datafusion::logical_expr::EmptyRelation {
3077 produce_one_row: false,
3078 schema: Arc::new(DFSchema::empty()),
3079 },
3080 ));
3081 }
3082 let time_index_column =
3083 self.ctx
3084 .time_index_column
3085 .clone()
3086 .with_context(|| TimeIndexNotFoundSnafu {
3087 table: self.ctx.table_name.clone().unwrap_or_default(),
3088 })?;
3089 let field_column = self
3091 .ctx
3092 .field_columns
3093 .first()
3094 .with_context(|| FunctionInvalidArgumentSnafu {
3095 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3096 })?
3097 .clone();
3098 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3100
3101 Ok(LogicalPlan::Extension(Extension {
3102 node: Arc::new(
3103 HistogramFold::new(
3104 LE_COLUMN_NAME.to_string(),
3105 field_column,
3106 time_index_column,
3107 phi,
3108 input_plan,
3109 )
3110 .context(DataFusionPlanningSnafu)?,
3111 ),
3112 }))
3113 }
3114
3115 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3117 if args.args.len() != 1 {
3118 return FunctionInvalidArgumentSnafu {
3119 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3120 }
3121 .fail();
3122 }
3123 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3124
3125 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3127 self.ctx.reset_table_name_and_schema();
3128 self.ctx.tag_columns = vec![];
3129 self.ctx.field_columns = vec![greptime_value().to_string()];
3130 Ok(LogicalPlan::Extension(Extension {
3131 node: Arc::new(
3132 EmptyMetric::new(
3133 self.ctx.start,
3134 self.ctx.end,
3135 self.ctx.interval,
3136 SPECIAL_TIME_FUNCTION.to_string(),
3137 greptime_value().to_string(),
3138 Some(lit),
3139 )
3140 .context(DataFusionPlanningSnafu)?,
3141 ),
3142 }))
3143 }
3144
3145 async fn create_scalar_plan(
3147 &mut self,
3148 args: &PromFunctionArgs,
3149 query_engine_state: &QueryEngineState,
3150 ) -> Result<LogicalPlan> {
3151 ensure!(
3152 args.len() == 1,
3153 FunctionInvalidArgumentSnafu {
3154 fn_name: SCALAR_FUNCTION
3155 }
3156 );
3157 let input = self
3158 .prom_expr_to_plan(&args.args[0], query_engine_state)
3159 .await?;
3160 ensure!(
3161 self.ctx.field_columns.len() == 1,
3162 MultiFieldsNotSupportedSnafu {
3163 operator: SCALAR_FUNCTION
3164 },
3165 );
3166 let scalar_plan = LogicalPlan::Extension(Extension {
3167 node: Arc::new(
3168 ScalarCalculate::new(
3169 self.ctx.start,
3170 self.ctx.end,
3171 self.ctx.interval,
3172 input,
3173 self.ctx.time_index_column.as_ref().unwrap(),
3174 &self.ctx.tag_columns,
3175 &self.ctx.field_columns[0],
3176 self.ctx.table_name.as_deref(),
3177 )
3178 .context(PromqlPlanNodeSnafu)?,
3179 ),
3180 });
3181 self.ctx.tag_columns.clear();
3183 self.ctx.field_columns.clear();
3184 self.ctx
3185 .field_columns
3186 .push(scalar_plan.schema().field(1).name().clone());
3187 Ok(scalar_plan)
3188 }
3189
3190 async fn create_absent_plan(
3192 &mut self,
3193 args: &PromFunctionArgs,
3194 query_engine_state: &QueryEngineState,
3195 ) -> Result<LogicalPlan> {
3196 if args.args.len() != 1 {
3197 return FunctionInvalidArgumentSnafu {
3198 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3199 }
3200 .fail();
3201 }
3202 let input = self
3203 .prom_expr_to_plan(&args.args[0], query_engine_state)
3204 .await?;
3205
3206 let time_index_expr = self.create_time_index_column_expr()?;
3207 let first_field_expr =
3208 self.create_field_column_exprs()?
3209 .pop()
3210 .with_context(|| ValueNotFoundSnafu {
3211 table: self.ctx.table_name.clone().unwrap_or_default(),
3212 })?;
3213 let first_value_expr = first_value(first_field_expr, vec![]);
3214
3215 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3216 .aggregate(
3217 vec![time_index_expr.clone()],
3218 vec![first_value_expr.clone()],
3219 )
3220 .context(DataFusionPlanningSnafu)?
3221 .sort(vec![time_index_expr.sort(true, false)])
3222 .context(DataFusionPlanningSnafu)?
3223 .build()
3224 .context(DataFusionPlanningSnafu)?;
3225
3226 let fake_labels = self
3227 .ctx
3228 .selector_matcher
3229 .iter()
3230 .filter_map(|matcher| match matcher.op {
3231 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3232 _ => None,
3233 })
3234 .collect::<Vec<_>>();
3235
3236 let absent_plan = LogicalPlan::Extension(Extension {
3238 node: Arc::new(
3239 Absent::try_new(
3240 self.ctx.start,
3241 self.ctx.end,
3242 self.ctx.interval,
3243 self.ctx.time_index_column.as_ref().unwrap().clone(),
3244 self.ctx.field_columns[0].clone(),
3245 fake_labels,
3246 ordered_aggregated_input,
3247 )
3248 .context(DataFusionPlanningSnafu)?,
3249 ),
3250 });
3251
3252 Ok(absent_plan)
3253 }
3254
3255 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3258 match expr {
3259 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3260 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3261 PromExpr::VectorSelector(_)
3262 | PromExpr::MatrixSelector(_)
3263 | PromExpr::Extension(_)
3264 | PromExpr::Aggregate(_)
3265 | PromExpr::Subquery(_) => None,
3266 PromExpr::Call(Call { func, .. }) => {
3267 if func.name == SPECIAL_TIME_FUNCTION {
3268 None
3271 } else {
3272 None
3273 }
3274 }
3275 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3276 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3278 PromExpr::Binary(PromBinaryExpr {
3279 lhs,
3280 rhs,
3281 op,
3282 modifier,
3283 }) => {
3284 let lhs = Self::try_build_literal_expr(lhs)?;
3285 let rhs = Self::try_build_literal_expr(rhs)?;
3286 let is_comparison_op = Self::is_token_a_comparison_op(*op);
3287 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3288 let expr = expr_builder(lhs, rhs).ok()?;
3289
3290 let should_return_bool = if let Some(m) = modifier {
3291 m.return_bool
3292 } else {
3293 false
3294 };
3295 if is_comparison_op && should_return_bool {
3296 Some(DfExpr::Cast(Cast {
3297 expr: Box::new(expr),
3298 data_type: ArrowDataType::Float64,
3299 }))
3300 } else {
3301 Some(expr)
3302 }
3303 }
3304 }
3305 }
3306
3307 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3308 match expr {
3309 PromExpr::Call(Call { func, .. }) => {
3310 if func.name == SPECIAL_TIME_FUNCTION
3311 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3312 {
3313 Some(build_special_time_expr(time_index_col))
3314 } else {
3315 None
3316 }
3317 }
3318 _ => None,
3319 }
3320 }
3321
3322 #[allow(clippy::type_complexity)]
3325 fn prom_token_to_binary_expr_builder(
3326 token: TokenType,
3327 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3328 let cast_float = |expr| {
3329 if matches!(
3330 &expr,
3331 DfExpr::Cast(Cast {
3332 data_type: ArrowDataType::Float64,
3333 ..
3334 })
3335 ) || matches!(&expr, DfExpr::Literal(ScalarValue::Float64(_), _))
3336 {
3337 expr
3338 } else {
3339 DfExpr::Cast(Cast {
3340 expr: Box::new(expr),
3341 data_type: ArrowDataType::Float64,
3342 })
3343 }
3344 };
3345 match token.id() {
3346 token::T_ADD => Ok(Box::new(move |lhs, rhs| {
3347 Ok(cast_float(lhs) + cast_float(rhs))
3348 })),
3349 token::T_SUB => Ok(Box::new(move |lhs, rhs| {
3350 Ok(cast_float(lhs) - cast_float(rhs))
3351 })),
3352 token::T_MUL => Ok(Box::new(move |lhs, rhs| {
3353 Ok(cast_float(lhs) * cast_float(rhs))
3354 })),
3355 token::T_DIV => Ok(Box::new(move |lhs, rhs| {
3356 Ok(cast_float(lhs) / cast_float(rhs))
3357 })),
3358 token::T_MOD => Ok(Box::new(move |lhs: DfExpr, rhs| {
3359 Ok(cast_float(lhs) % cast_float(rhs))
3360 })),
3361 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3362 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3363 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3364 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3365 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3366 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3367 token::T_POW => Ok(Box::new(move |lhs, rhs| {
3368 Ok(DfExpr::ScalarFunction(ScalarFunction {
3369 func: datafusion_functions::math::power(),
3370 args: vec![cast_float(lhs), cast_float(rhs)],
3371 }))
3372 })),
3373 token::T_ATAN2 => Ok(Box::new(move |lhs, rhs| {
3374 Ok(DfExpr::ScalarFunction(ScalarFunction {
3375 func: datafusion_functions::math::atan2(),
3376 args: vec![cast_float(lhs), cast_float(rhs)],
3377 }))
3378 })),
3379 _ => UnexpectedTokenSnafu { token }.fail(),
3380 }
3381 }
3382
3383 fn is_token_a_comparison_op(token: TokenType) -> bool {
3385 matches!(
3386 token.id(),
3387 token::T_EQLC
3388 | token::T_NEQ
3389 | token::T_GTR
3390 | token::T_LSS
3391 | token::T_GTE
3392 | token::T_LTE
3393 )
3394 }
3395
3396 fn is_token_a_set_op(token: TokenType) -> bool {
3398 matches!(
3399 token.id(),
3400 token::T_LAND | token::T_LOR | token::T_LUNLESS )
3404 }
3405
3406 fn align_binary_field_columns<'a>(
3407 left_field_columns: &'a [String],
3408 right_field_columns: &'a [String],
3409 ) -> (Vec<String>, Vec<(&'a String, &'a String)>) {
3410 let field_pairs = left_field_columns
3411 .iter()
3412 .zip(right_field_columns.iter())
3413 .collect::<Vec<_>>();
3414 let output_field_columns = field_pairs
3415 .iter()
3416 .map(|(left_col_name, _)| (*left_col_name).clone())
3417 .collect();
3418 (output_field_columns, field_pairs)
3419 }
3420
3421 fn plan_has_tsid_column(plan: &LogicalPlan) -> bool {
3422 plan.schema()
3423 .fields()
3424 .iter()
3425 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3426 }
3427
3428 fn optional_tsid_projection(
3429 schema: &DFSchemaRef,
3430 table_ref: Option<&TableReference>,
3431 keep_tsid: bool,
3432 ) -> Option<DfExpr> {
3433 keep_tsid.then_some(()).and_then(|_| {
3434 schema
3435 .qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME)
3436 .ok()
3437 .map(|field| DfExpr::Column(field.into()))
3438 })
3439 }
3440
3441 fn binary_join_key_columns(
3442 &self,
3443 left: &LogicalPlan,
3444 right: &LogicalPlan,
3445 only_join_time_index: bool,
3446 modifier: &Option<BinModifier>,
3447 ) -> (BTreeSet<String>, BTreeSet<String>) {
3448 let use_tsid_join = !only_join_time_index
3449 && modifier.as_ref().is_none_or(|modifier| {
3450 modifier.matching.is_none()
3451 && matches!(modifier.card, VectorMatchCardinality::OneToOne)
3452 })
3453 && Self::plan_has_tsid_column(left)
3454 && Self::plan_has_tsid_column(right);
3455
3456 let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join {
3457 (
3458 BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
3459 BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
3460 )
3461 } else {
3462 let left_tag_columns = if only_join_time_index {
3463 BTreeSet::new()
3464 } else {
3465 self.ctx
3466 .tag_columns
3467 .iter()
3468 .cloned()
3469 .collect::<BTreeSet<_>>()
3470 };
3471 let right_tag_columns = left_tag_columns.clone();
3472 (left_tag_columns, right_tag_columns)
3473 };
3474
3475 if !use_tsid_join
3476 && let Some(modifier) = modifier
3477 && let Some(matching) = &modifier.matching
3478 {
3479 match matching {
3480 LabelModifier::Include(on) => {
3481 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3482 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3483 right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect();
3484 }
3485 LabelModifier::Exclude(ignoring) => {
3486 for label in &ignoring.labels {
3487 let _ = left_tag_columns.remove(label);
3488 let _ = right_tag_columns.remove(label);
3489 }
3490 }
3491 }
3492 }
3493
3494 (left_tag_columns, right_tag_columns)
3495 }
3496
3497 #[allow(clippy::too_many_arguments)]
3500 fn join_on_non_field_columns(
3501 &self,
3502 left: LogicalPlan,
3503 right: LogicalPlan,
3504 left_table_ref: TableReference,
3505 right_table_ref: TableReference,
3506 left_time_index_column: Option<String>,
3507 right_time_index_column: Option<String>,
3508 only_join_time_index: bool,
3509 modifier: &Option<BinModifier>,
3510 ) -> Result<LogicalPlan> {
3511 let (mut left_tag_columns, mut right_tag_columns) =
3512 self.binary_join_key_columns(&left, &right, only_join_time_index, modifier);
3513
3514 if let (Some(left_time_index_column), Some(right_time_index_column)) =
3516 (left_time_index_column, right_time_index_column)
3517 {
3518 left_tag_columns.insert(left_time_index_column);
3519 right_tag_columns.insert(right_time_index_column);
3520 }
3521
3522 let right = LogicalPlanBuilder::from(right)
3523 .alias(right_table_ref)
3524 .context(DataFusionPlanningSnafu)?
3525 .build()
3526 .context(DataFusionPlanningSnafu)?;
3527
3528 LogicalPlanBuilder::from(left)
3530 .alias(left_table_ref)
3531 .context(DataFusionPlanningSnafu)?
3532 .join_detailed(
3533 right,
3534 JoinType::Inner,
3535 (
3536 left_tag_columns
3537 .into_iter()
3538 .map(Column::from_name)
3539 .collect::<Vec<_>>(),
3540 right_tag_columns
3541 .into_iter()
3542 .map(Column::from_name)
3543 .collect::<Vec<_>>(),
3544 ),
3545 None,
3546 NullEquality::NullEqualsNull,
3547 )
3548 .context(DataFusionPlanningSnafu)?
3549 .build()
3550 .context(DataFusionPlanningSnafu)
3551 }
3552
3553 fn set_op_on_non_field_columns(
3555 &mut self,
3556 left: LogicalPlan,
3557 mut right: LogicalPlan,
3558 left_context: PromPlannerContext,
3559 right_context: PromPlannerContext,
3560 op: TokenType,
3561 modifier: &Option<BinModifier>,
3562 ) -> Result<LogicalPlan> {
3563 let mut left_tag_col_set = left_context
3564 .tag_columns
3565 .iter()
3566 .cloned()
3567 .collect::<HashSet<_>>();
3568 let mut right_tag_col_set = right_context
3569 .tag_columns
3570 .iter()
3571 .cloned()
3572 .collect::<HashSet<_>>();
3573
3574 if matches!(op.id(), token::T_LOR) {
3575 return self.or_operator(
3576 left,
3577 right,
3578 left_tag_col_set,
3579 right_tag_col_set,
3580 left_context,
3581 right_context,
3582 modifier,
3583 );
3584 }
3585
3586 if let Some(modifier) = modifier {
3588 ensure!(
3590 matches!(
3591 modifier.card,
3592 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3593 ),
3594 UnsupportedVectorMatchSnafu {
3595 name: modifier.card.clone(),
3596 },
3597 );
3598 if let Some(matching) = &modifier.matching {
3600 match matching {
3601 LabelModifier::Include(on) => {
3603 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3604 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3605 right_tag_col_set =
3606 right_tag_col_set.intersection(&mask).cloned().collect();
3607 }
3608 LabelModifier::Exclude(ignoring) => {
3610 for label in &ignoring.labels {
3612 let _ = left_tag_col_set.remove(label);
3613 let _ = right_tag_col_set.remove(label);
3614 }
3615 }
3616 }
3617 }
3618 }
3619 if !matches!(op.id(), token::T_LOR) {
3621 ensure!(
3622 left_tag_col_set == right_tag_col_set,
3623 CombineTableColumnMismatchSnafu {
3624 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3625 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3626 }
3627 )
3628 };
3629 let left_time_index = left_context.time_index_column.clone().unwrap();
3630 let right_time_index = right_context.time_index_column.clone().unwrap();
3631 let join_keys = left_tag_col_set
3632 .iter()
3633 .cloned()
3634 .chain([left_time_index.clone()])
3635 .collect::<Vec<_>>();
3636 self.ctx.time_index_column = Some(left_time_index.clone());
3637 self.ctx.use_tsid = left_context.use_tsid;
3638
3639 if left_context.time_index_column != right_context.time_index_column {
3641 let right_project_exprs = right
3642 .schema()
3643 .fields()
3644 .iter()
3645 .map(|field| {
3646 if field.name() == &right_time_index {
3647 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3648 } else {
3649 DfExpr::Column(Column::from_name(field.name()))
3650 }
3651 })
3652 .collect::<Vec<_>>();
3653
3654 right = LogicalPlanBuilder::from(right)
3655 .project(right_project_exprs)
3656 .context(DataFusionPlanningSnafu)?
3657 .build()
3658 .context(DataFusionPlanningSnafu)?;
3659 }
3660
3661 ensure!(
3662 left_context.field_columns.len() == 1,
3663 MultiFieldsNotSupportedSnafu {
3664 operator: "AND operator"
3665 }
3666 );
3667 let left_field_col = left_context.field_columns.first().unwrap();
3670 self.ctx.field_columns = vec![left_field_col.clone()];
3671
3672 match op.id() {
3675 token::T_LAND => LogicalPlanBuilder::from(left)
3676 .distinct()
3677 .context(DataFusionPlanningSnafu)?
3678 .join_detailed(
3679 right,
3680 JoinType::LeftSemi,
3681 (join_keys.clone(), join_keys),
3682 None,
3683 NullEquality::NullEqualsNull,
3684 )
3685 .context(DataFusionPlanningSnafu)?
3686 .build()
3687 .context(DataFusionPlanningSnafu),
3688 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3689 .distinct()
3690 .context(DataFusionPlanningSnafu)?
3691 .join_detailed(
3692 right,
3693 JoinType::LeftAnti,
3694 (join_keys.clone(), join_keys),
3695 None,
3696 NullEquality::NullEqualsNull,
3697 )
3698 .context(DataFusionPlanningSnafu)?
3699 .build()
3700 .context(DataFusionPlanningSnafu),
3701 token::T_LOR => {
3702 unreachable!()
3705 }
3706 _ => UnexpectedTokenSnafu { token: op }.fail(),
3707 }
3708 }
3709
3710 #[allow(clippy::too_many_arguments)]
3712 fn or_operator(
3713 &mut self,
3714 left: LogicalPlan,
3715 right: LogicalPlan,
3716 left_tag_cols_set: HashSet<String>,
3717 right_tag_cols_set: HashSet<String>,
3718 left_context: PromPlannerContext,
3719 right_context: PromPlannerContext,
3720 modifier: &Option<BinModifier>,
3721 ) -> Result<LogicalPlan> {
3722 ensure!(
3724 left_context.field_columns.len() == right_context.field_columns.len(),
3725 CombineTableColumnMismatchSnafu {
3726 left: left_context.field_columns.clone(),
3727 right: right_context.field_columns.clone()
3728 }
3729 );
3730 ensure!(
3731 left_context.field_columns.len() == 1,
3732 MultiFieldsNotSupportedSnafu {
3733 operator: "OR operator"
3734 }
3735 );
3736
3737 let all_tags = left_tag_cols_set
3739 .union(&right_tag_cols_set)
3740 .cloned()
3741 .collect::<HashSet<_>>();
3742 let tags_not_in_left = all_tags
3743 .difference(&left_tag_cols_set)
3744 .cloned()
3745 .collect::<Vec<_>>();
3746 let tags_not_in_right = all_tags
3747 .difference(&right_tag_cols_set)
3748 .cloned()
3749 .collect::<Vec<_>>();
3750 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3751 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3752 let left_qualifier_string = left_qualifier
3753 .as_ref()
3754 .map(|l| l.to_string())
3755 .unwrap_or_default();
3756 let right_qualifier_string = right_qualifier
3757 .as_ref()
3758 .map(|r| r.to_string())
3759 .unwrap_or_default();
3760 let left_time_index_column =
3761 left_context
3762 .time_index_column
3763 .clone()
3764 .with_context(|| TimeIndexNotFoundSnafu {
3765 table: left_qualifier_string.clone(),
3766 })?;
3767 let right_time_index_column =
3768 right_context
3769 .time_index_column
3770 .clone()
3771 .with_context(|| TimeIndexNotFoundSnafu {
3772 table: right_qualifier_string.clone(),
3773 })?;
3774 let left_field_col = left_context.field_columns.first().unwrap();
3776 let right_field_col = right_context.field_columns.first().unwrap();
3777 let left_has_tsid = left
3778 .schema()
3779 .fields()
3780 .iter()
3781 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3782 let right_has_tsid = right
3783 .schema()
3784 .fields()
3785 .iter()
3786 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3787
3788 let mut all_columns_set = left
3790 .schema()
3791 .fields()
3792 .iter()
3793 .chain(right.schema().fields().iter())
3794 .map(|field| field.name().clone())
3795 .collect::<HashSet<_>>();
3796 if !(left_has_tsid && right_has_tsid) {
3799 all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3800 }
3801 all_columns_set.remove(&left_time_index_column);
3803 all_columns_set.remove(&right_time_index_column);
3804 if left_field_col != right_field_col {
3806 all_columns_set.remove(right_field_col);
3807 }
3808 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3809 all_columns.sort_unstable();
3811 all_columns.insert(0, left_time_index_column.clone());
3813
3814 let left_proj_exprs = all_columns.iter().map(|col| {
3816 if tags_not_in_left.contains(col) {
3817 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3818 } else {
3819 DfExpr::Column(Column::new(None::<String>, col))
3820 }
3821 });
3822 let right_time_index_expr = DfExpr::Column(Column::new(
3823 right_qualifier.clone(),
3824 right_time_index_column,
3825 ))
3826 .alias(left_time_index_column.clone());
3827 let right_qualifier_for_field = right
3830 .schema()
3831 .iter()
3832 .find(|(_, f)| f.name() == right_field_col)
3833 .map(|(q, _)| q)
3834 .with_context(|| ColumnNotFoundSnafu {
3835 col: right_field_col.clone(),
3836 })?
3837 .cloned();
3838
3839 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3841 if col == left_field_col && left_field_col != right_field_col {
3843 DfExpr::Column(Column::new(
3845 right_qualifier_for_field.clone(),
3846 right_field_col,
3847 ))
3848 } else if tags_not_in_right.contains(col) {
3849 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3850 } else {
3851 DfExpr::Column(Column::new(None::<String>, col))
3852 }
3853 });
3854 let right_proj_exprs = [right_time_index_expr]
3855 .into_iter()
3856 .chain(right_proj_exprs_without_time_index);
3857
3858 let left_projected = LogicalPlanBuilder::from(left)
3859 .project(left_proj_exprs)
3860 .context(DataFusionPlanningSnafu)?
3861 .alias(left_qualifier_string.clone())
3862 .context(DataFusionPlanningSnafu)?
3863 .build()
3864 .context(DataFusionPlanningSnafu)?;
3865 let right_projected = LogicalPlanBuilder::from(right)
3866 .project(right_proj_exprs)
3867 .context(DataFusionPlanningSnafu)?
3868 .alias(right_qualifier_string.clone())
3869 .context(DataFusionPlanningSnafu)?
3870 .build()
3871 .context(DataFusionPlanningSnafu)?;
3872
3873 let mut match_columns = if let Some(modifier) = modifier
3875 && let Some(matching) = &modifier.matching
3876 {
3877 match matching {
3878 LabelModifier::Include(on) => on.labels.clone(),
3880 LabelModifier::Exclude(ignoring) => {
3882 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3883 all_tags.difference(&ignoring).cloned().collect()
3884 }
3885 }
3886 } else {
3887 all_tags.iter().cloned().collect()
3888 };
3889 match_columns.sort_unstable();
3891 let schema = left_projected.schema().clone();
3893 let union_distinct_on = UnionDistinctOn::new(
3894 left_projected,
3895 right_projected,
3896 match_columns,
3897 left_time_index_column.clone(),
3898 schema,
3899 );
3900 let result = LogicalPlan::Extension(Extension {
3901 node: Arc::new(union_distinct_on),
3902 });
3903
3904 self.ctx.time_index_column = Some(left_time_index_column);
3906 self.ctx.tag_columns = all_tags.into_iter().collect();
3907 self.ctx.field_columns = vec![left_field_col.clone()];
3908 self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3909
3910 Ok(result)
3911 }
3912
3913 fn projection_for_each_field_column<F>(
3921 &mut self,
3922 input: LogicalPlan,
3923 name_to_expr: F,
3924 ) -> Result<LogicalPlan>
3925 where
3926 F: FnMut(&String) -> Result<DfExpr>,
3927 {
3928 let table_ref = self.ctx.table_name.clone().map(TableReference::bare);
3929 let non_field_columns_iter = self
3930 .ctx
3931 .tag_columns
3932 .iter()
3933 .chain(self.ctx.time_index_column.iter())
3934 .map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col))));
3935 let tsid_iter =
3936 Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid)
3937 .into_iter()
3938 .map(Ok);
3939
3940 let result_field_columns = self
3942 .ctx
3943 .field_columns
3944 .iter()
3945 .map(name_to_expr)
3946 .collect::<Result<Vec<_>>>()?;
3947
3948 self.ctx.field_columns = result_field_columns
3950 .iter()
3951 .map(|expr| expr.schema_name().to_string())
3952 .collect();
3953 let field_columns_iter = result_field_columns
3954 .into_iter()
3955 .zip(self.ctx.field_columns.iter())
3956 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3957
3958 let project_fields = non_field_columns_iter
3960 .chain(tsid_iter)
3961 .chain(field_columns_iter)
3962 .collect::<Result<Vec<_>>>()?;
3963
3964 LogicalPlanBuilder::from(input)
3965 .project(project_fields)
3966 .context(DataFusionPlanningSnafu)?
3967 .build()
3968 .context(DataFusionPlanningSnafu)
3969 }
3970
3971 fn filter_on_field_column<F>(
3974 &self,
3975 input: LogicalPlan,
3976 mut name_to_expr: F,
3977 ) -> Result<LogicalPlan>
3978 where
3979 F: FnMut(&String) -> Result<DfExpr>,
3980 {
3981 ensure!(
3982 self.ctx.field_columns.len() == 1,
3983 UnsupportedExprSnafu {
3984 name: "filter on multi-value input"
3985 }
3986 );
3987
3988 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3989
3990 LogicalPlanBuilder::from(input)
3991 .filter(field_column_filter)
3992 .context(DataFusionPlanningSnafu)?
3993 .build()
3994 .context(DataFusionPlanningSnafu)
3995 }
3996
3997 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
4000 let input_expr = datafusion::logical_expr::col(
4001 self.ctx
4002 .time_index_column
4003 .as_ref()
4004 .with_context(|| TimeIndexNotFoundSnafu {
4006 table: "<doesn't matter>",
4007 })?,
4008 );
4009 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
4010 func: datafusion_functions::datetime::date_part(),
4011 args: vec![date_part.lit(), input_expr],
4012 });
4013 Ok(fn_expr)
4014 }
4015
4016 fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
4017 let schema = plan.schema();
4018 if !schema
4019 .fields()
4020 .iter()
4021 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4022 {
4023 return Ok(plan);
4024 }
4025
4026 let project_exprs = schema
4027 .fields()
4028 .iter()
4029 .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
4030 .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
4031 .collect::<Result<Vec<_>>>()?;
4032
4033 LogicalPlanBuilder::from(plan)
4034 .project(project_exprs)
4035 .context(DataFusionPlanningSnafu)?
4036 .build()
4037 .context(DataFusionPlanningSnafu)
4038 }
4039
4040 fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
4042 let fields_expr = self.create_field_column_exprs()?;
4043
4044 ensure!(
4046 fields_expr.len() == 1,
4047 UnsupportedExprSnafu {
4048 name: "alias on multi-value result"
4049 }
4050 );
4051
4052 let project_fields = fields_expr
4053 .into_iter()
4054 .map(|expr| expr.alias(&alias_name))
4055 .chain(self.create_tag_column_exprs()?)
4056 .chain(Some(self.create_time_index_column_expr()?));
4057
4058 LogicalPlanBuilder::from(plan)
4059 .project(project_fields)
4060 .context(DataFusionPlanningSnafu)?
4061 .build()
4062 .context(DataFusionPlanningSnafu)
4063 }
4064}
4065
4066#[derive(Default, Debug)]
4067struct FunctionArgs {
4068 input: Option<PromExpr>,
4069 literals: Vec<DfExpr>,
4070}
4071
4072#[derive(Debug, Clone)]
4075enum ScalarFunc {
4076 DataFusionBuiltin(Arc<ScalarUdfDef>),
4080 DataFusionUdf(Arc<ScalarUdfDef>),
4084 Udf(Arc<ScalarUdfDef>),
4089 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4096 GeneratedExpr,
4100}
4101
4102#[cfg(test)]
4103mod test {
4104 use std::time::{Duration, UNIX_EPOCH};
4105
4106 use catalog::RegisterTableRequest;
4107 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4108 use common_base::Plugins;
4109 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4110 use common_query::prelude::greptime_timestamp;
4111 use common_query::test_util::DummyDecoder;
4112 use datafusion::arrow::datatypes::Schema as ArrowSchema;
4113 use datafusion::datasource::memory::MemorySourceConfig;
4114 use datafusion::datasource::source::DataSourceExec;
4115 use datafusion::logical_expr::Extension;
4116 use datatypes::prelude::ConcreteDataType;
4117 use datatypes::schema::{ColumnSchema, Schema};
4118 use promql_parser::label::Labels;
4119 use promql_parser::parser;
4120 use session::context::QueryContext;
4121 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4122 use table::test_util::EmptyTable;
4123
4124 use super::*;
4125 use crate::QueryEngineContext;
4126 use crate::options::QueryOptions;
4127 use crate::parser::QueryLanguageParser;
4128
4129 fn find_instant_manipulate(plan: &LogicalPlan) -> Option<&InstantManipulate> {
4130 if let LogicalPlan::Extension(Extension { node }) = plan
4131 && let Some(instant_manipulate) = node.as_any().downcast_ref::<InstantManipulate>()
4132 {
4133 return Some(instant_manipulate);
4134 }
4135
4136 plan.inputs().into_iter().find_map(find_instant_manipulate)
4137 }
4138
4139 fn build_query_engine_state() -> QueryEngineState {
4140 QueryEngineState::new(
4141 new_memory_catalog_manager().unwrap(),
4142 None,
4143 None,
4144 None,
4145 None,
4146 None,
4147 false,
4148 Plugins::default(),
4149 QueryOptions::default(),
4150 )
4151 }
4152
4153 async fn build_optimized_promql_plan(
4154 table_provider: DfTableSourceProvider,
4155 eval_stmt: &EvalStmt,
4156 ) -> LogicalPlan {
4157 let state = build_query_engine_state();
4158 let raw_plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &state)
4159 .await
4160 .unwrap();
4161 let context = QueryEngineContext::new(state.session_state(), QueryContext::arc());
4162 state
4163 .optimize_by_extension_rules(raw_plan, &context)
4164 .unwrap()
4165 }
4166
4167 async fn build_optimized_tsid_plan(
4168 query: &str,
4169 num_tag: usize,
4170 num_field: usize,
4171 end_secs: u64,
4172 lookback_secs: u64,
4173 ) -> String {
4174 let eval_stmt = EvalStmt {
4175 expr: parser::parse(query).unwrap(),
4176 start: UNIX_EPOCH,
4177 end: UNIX_EPOCH
4178 .checked_add(Duration::from_secs(end_secs))
4179 .unwrap(),
4180 interval: Duration::from_secs(5),
4181 lookback_delta: Duration::from_secs(lookback_secs),
4182 };
4183 let table_provider = build_test_table_provider_with_tsid(
4184 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4185 num_tag,
4186 num_field,
4187 )
4188 .await;
4189
4190 build_optimized_promql_plan(table_provider, &eval_stmt)
4191 .await
4192 .display_indent_schema()
4193 .to_string()
4194 }
4195
4196 async fn assert_nested_count_rewrite_applies(query: &str, expected_outer_agg: &str) {
4197 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4198
4199 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4200 assert!(plan_str.contains("Projection: some_metric.timestamp, some_metric.tag_0"));
4201 assert!(plan_str.contains("Distinct:"));
4202 assert!(plan_str.contains(expected_outer_agg), "{plan_str}");
4203 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4204 }
4205
4206 async fn assert_nested_count_rewrite_missing(query: &str, num_tag: usize, lookback_secs: u64) {
4207 let plan_str = build_optimized_tsid_plan(query, num_tag, 1, 100_000, lookback_secs).await;
4208 assert!(!plan_str.contains("Distinct:"), "{plan_str}");
4209 }
4210
4211 fn build_eval_stmt(expr: &str) -> EvalStmt {
4212 EvalStmt {
4213 expr: parser::parse(expr).unwrap(),
4214 start: UNIX_EPOCH,
4215 end: UNIX_EPOCH
4216 .checked_add(Duration::from_secs(100_000))
4217 .unwrap(),
4218 interval: Duration::from_secs(5),
4219 lookback_delta: Duration::from_secs(1),
4220 }
4221 }
4222
4223 async fn build_test_table_provider(
4224 table_name_tuples: &[(String, String)],
4225 num_tag: usize,
4226 num_field: usize,
4227 ) -> DfTableSourceProvider {
4228 let catalog_list = MemoryCatalogManager::with_default_setup();
4229 for (schema_name, table_name) in table_name_tuples {
4230 let mut columns = vec![];
4231 for i in 0..num_tag {
4232 columns.push(ColumnSchema::new(
4233 format!("tag_{i}"),
4234 ConcreteDataType::string_datatype(),
4235 false,
4236 ));
4237 }
4238 columns.push(
4239 ColumnSchema::new(
4240 "timestamp".to_string(),
4241 ConcreteDataType::timestamp_millisecond_datatype(),
4242 false,
4243 )
4244 .with_time_index(true),
4245 );
4246 for i in 0..num_field {
4247 columns.push(ColumnSchema::new(
4248 format!("field_{i}"),
4249 ConcreteDataType::float64_datatype(),
4250 true,
4251 ));
4252 }
4253 let schema = Arc::new(Schema::new(columns));
4254 let table_meta = TableMetaBuilder::empty()
4255 .schema(schema)
4256 .primary_key_indices((0..num_tag).collect())
4257 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4258 .next_column_id(1024)
4259 .build()
4260 .unwrap();
4261 let table_info = TableInfoBuilder::default()
4262 .name(table_name.clone())
4263 .meta(table_meta)
4264 .build()
4265 .unwrap();
4266 let table = EmptyTable::from_table_info(&table_info);
4267
4268 assert!(
4269 catalog_list
4270 .register_table_sync(RegisterTableRequest {
4271 catalog: DEFAULT_CATALOG_NAME.to_string(),
4272 schema: schema_name.clone(),
4273 table_name: table_name.clone(),
4274 table_id: 1024,
4275 table,
4276 })
4277 .is_ok()
4278 );
4279 }
4280
4281 DfTableSourceProvider::new(
4282 catalog_list,
4283 false,
4284 QueryContext::arc(),
4285 DummyDecoder::arc(),
4286 false,
4287 )
4288 }
4289
4290 async fn build_test_table_provider_with_tsid(
4291 table_name_tuples: &[(String, String)],
4292 num_tag: usize,
4293 num_field: usize,
4294 ) -> DfTableSourceProvider {
4295 let table_specs = table_name_tuples
4296 .iter()
4297 .map(|(schema_name, table_name)| ((schema_name.clone(), table_name.clone()), num_field))
4298 .collect::<Vec<_>>();
4299 build_test_table_provider_with_tsid_fields(&table_specs, num_tag).await
4300 }
4301
4302 async fn build_test_table_provider_with_tsid_fields(
4303 table_specs: &[((String, String), usize)],
4304 num_tag: usize,
4305 ) -> DfTableSourceProvider {
4306 let catalog_list = MemoryCatalogManager::with_default_setup();
4307
4308 let physical_table_name = "phy";
4309 let physical_table_id = 999u32;
4310 let physical_num_field = table_specs
4311 .iter()
4312 .map(|(_, num_field)| *num_field)
4313 .max()
4314 .unwrap_or(0);
4315
4316 {
4318 let mut columns = vec![
4319 ColumnSchema::new(
4320 DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4321 ConcreteDataType::uint32_datatype(),
4322 false,
4323 ),
4324 ColumnSchema::new(
4325 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4326 ConcreteDataType::uint64_datatype(),
4327 false,
4328 ),
4329 ];
4330 for i in 0..num_tag {
4331 columns.push(ColumnSchema::new(
4332 format!("tag_{i}"),
4333 ConcreteDataType::string_datatype(),
4334 false,
4335 ));
4336 }
4337 columns.push(
4338 ColumnSchema::new(
4339 "timestamp".to_string(),
4340 ConcreteDataType::timestamp_millisecond_datatype(),
4341 false,
4342 )
4343 .with_time_index(true),
4344 );
4345 for i in 0..physical_num_field {
4346 columns.push(ColumnSchema::new(
4347 format!("field_{i}"),
4348 ConcreteDataType::float64_datatype(),
4349 true,
4350 ));
4351 }
4352
4353 let schema = Arc::new(Schema::new(columns));
4354 let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4355 let table_meta = TableMetaBuilder::empty()
4356 .schema(schema)
4357 .primary_key_indices(primary_key_indices)
4358 .value_indices((2 + num_tag..2 + num_tag + 1 + physical_num_field).collect())
4359 .engine(METRIC_ENGINE_NAME.to_string())
4360 .next_column_id(1024)
4361 .build()
4362 .unwrap();
4363 let table_info = TableInfoBuilder::default()
4364 .table_id(physical_table_id)
4365 .name(physical_table_name)
4366 .meta(table_meta)
4367 .build()
4368 .unwrap();
4369 let table = EmptyTable::from_table_info(&table_info);
4370
4371 assert!(
4372 catalog_list
4373 .register_table_sync(RegisterTableRequest {
4374 catalog: DEFAULT_CATALOG_NAME.to_string(),
4375 schema: DEFAULT_SCHEMA_NAME.to_string(),
4376 table_name: physical_table_name.to_string(),
4377 table_id: physical_table_id,
4378 table,
4379 })
4380 .is_ok()
4381 );
4382 }
4383
4384 for (idx, ((schema_name, table_name), num_field)) in table_specs.iter().enumerate() {
4386 let mut columns = vec![];
4387 for i in 0..num_tag {
4388 columns.push(ColumnSchema::new(
4389 format!("tag_{i}"),
4390 ConcreteDataType::string_datatype(),
4391 false,
4392 ));
4393 }
4394 columns.push(
4395 ColumnSchema::new(
4396 "timestamp".to_string(),
4397 ConcreteDataType::timestamp_millisecond_datatype(),
4398 false,
4399 )
4400 .with_time_index(true),
4401 );
4402 for i in 0..*num_field {
4403 columns.push(ColumnSchema::new(
4404 format!("field_{i}"),
4405 ConcreteDataType::float64_datatype(),
4406 true,
4407 ));
4408 }
4409
4410 let schema = Arc::new(Schema::new(columns));
4411 let mut options = table::requests::TableOptions::default();
4412 options.extra_options.insert(
4413 LOGICAL_TABLE_METADATA_KEY.to_string(),
4414 physical_table_name.to_string(),
4415 );
4416 let table_id = 1024u32 + idx as u32;
4417 let table_meta = TableMetaBuilder::empty()
4418 .schema(schema)
4419 .primary_key_indices((0..num_tag).collect())
4420 .value_indices((num_tag + 1..num_tag + 1 + *num_field).collect())
4421 .engine(METRIC_ENGINE_NAME.to_string())
4422 .options(options)
4423 .next_column_id(1024)
4424 .build()
4425 .unwrap();
4426 let table_info = TableInfoBuilder::default()
4427 .table_id(table_id)
4428 .name(table_name.clone())
4429 .meta(table_meta)
4430 .build()
4431 .unwrap();
4432 let table = EmptyTable::from_table_info(&table_info);
4433
4434 assert!(
4435 catalog_list
4436 .register_table_sync(RegisterTableRequest {
4437 catalog: DEFAULT_CATALOG_NAME.to_string(),
4438 schema: schema_name.clone(),
4439 table_name: table_name.clone(),
4440 table_id,
4441 table,
4442 })
4443 .is_ok()
4444 );
4445 }
4446
4447 DfTableSourceProvider::new(
4448 catalog_list,
4449 false,
4450 QueryContext::arc(),
4451 DummyDecoder::arc(),
4452 false,
4453 )
4454 }
4455
4456 async fn build_test_table_provider_with_fields(
4457 table_name_tuples: &[(String, String)],
4458 tags: &[&str],
4459 ) -> DfTableSourceProvider {
4460 let catalog_list = MemoryCatalogManager::with_default_setup();
4461 for (schema_name, table_name) in table_name_tuples {
4462 let mut columns = vec![];
4463 let num_tag = tags.len();
4464 for tag in tags {
4465 columns.push(ColumnSchema::new(
4466 tag.to_string(),
4467 ConcreteDataType::string_datatype(),
4468 false,
4469 ));
4470 }
4471 columns.push(
4472 ColumnSchema::new(
4473 greptime_timestamp().to_string(),
4474 ConcreteDataType::timestamp_millisecond_datatype(),
4475 false,
4476 )
4477 .with_time_index(true),
4478 );
4479 columns.push(ColumnSchema::new(
4480 greptime_value().to_string(),
4481 ConcreteDataType::float64_datatype(),
4482 true,
4483 ));
4484 let schema = Arc::new(Schema::new(columns));
4485 let table_meta = TableMetaBuilder::empty()
4486 .schema(schema)
4487 .primary_key_indices((0..num_tag).collect())
4488 .next_column_id(1024)
4489 .build()
4490 .unwrap();
4491 let table_info = TableInfoBuilder::default()
4492 .name(table_name.clone())
4493 .meta(table_meta)
4494 .build()
4495 .unwrap();
4496 let table = EmptyTable::from_table_info(&table_info);
4497
4498 assert!(
4499 catalog_list
4500 .register_table_sync(RegisterTableRequest {
4501 catalog: DEFAULT_CATALOG_NAME.to_string(),
4502 schema: schema_name.clone(),
4503 table_name: table_name.clone(),
4504 table_id: 1024,
4505 table,
4506 })
4507 .is_ok()
4508 );
4509 }
4510
4511 DfTableSourceProvider::new(
4512 catalog_list,
4513 false,
4514 QueryContext::arc(),
4515 DummyDecoder::arc(),
4516 false,
4517 )
4518 }
4519
4520 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4536 let prom_expr =
4537 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4538 let eval_stmt = EvalStmt {
4539 expr: prom_expr,
4540 start: UNIX_EPOCH,
4541 end: UNIX_EPOCH
4542 .checked_add(Duration::from_secs(100_000))
4543 .unwrap(),
4544 interval: Duration::from_secs(5),
4545 lookback_delta: Duration::from_secs(1),
4546 };
4547
4548 let table_provider = build_test_table_provider(
4549 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4550 1,
4551 1,
4552 )
4553 .await;
4554 let plan =
4555 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4556 .await
4557 .unwrap();
4558
4559 let expected = String::from(
4560 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4561 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4562 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4563 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4564 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4565 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4566 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"
4567 ).replace("TEMPLATE", plan_name);
4568
4569 assert_eq!(plan.display_indent_schema().to_string(), expected);
4570 }
4571
4572 #[tokio::test]
4573 async fn single_abs() {
4574 do_single_instant_function_call("abs", "abs").await;
4575 }
4576
4577 #[tokio::test]
4578 #[should_panic]
4579 async fn single_absent() {
4580 do_single_instant_function_call("absent", "").await;
4581 }
4582
4583 #[tokio::test]
4584 async fn single_ceil() {
4585 do_single_instant_function_call("ceil", "ceil").await;
4586 }
4587
4588 #[tokio::test]
4589 async fn single_exp() {
4590 do_single_instant_function_call("exp", "exp").await;
4591 }
4592
4593 #[tokio::test]
4594 async fn single_ln() {
4595 do_single_instant_function_call("ln", "ln").await;
4596 }
4597
4598 #[tokio::test]
4599 async fn single_log2() {
4600 do_single_instant_function_call("log2", "log2").await;
4601 }
4602
4603 #[tokio::test]
4604 async fn single_log10() {
4605 do_single_instant_function_call("log10", "log10").await;
4606 }
4607
4608 #[tokio::test]
4609 #[should_panic]
4610 async fn single_scalar() {
4611 do_single_instant_function_call("scalar", "").await;
4612 }
4613
4614 #[tokio::test]
4615 #[should_panic]
4616 async fn single_sgn() {
4617 do_single_instant_function_call("sgn", "").await;
4618 }
4619
4620 #[tokio::test]
4621 #[should_panic]
4622 async fn single_sort() {
4623 do_single_instant_function_call("sort", "").await;
4624 }
4625
4626 #[tokio::test]
4627 #[should_panic]
4628 async fn single_sort_desc() {
4629 do_single_instant_function_call("sort_desc", "").await;
4630 }
4631
4632 #[tokio::test]
4633 async fn single_sqrt() {
4634 do_single_instant_function_call("sqrt", "sqrt").await;
4635 }
4636
4637 #[tokio::test]
4638 #[should_panic]
4639 async fn single_timestamp() {
4640 do_single_instant_function_call("timestamp", "").await;
4641 }
4642
4643 #[tokio::test]
4644 async fn single_acos() {
4645 do_single_instant_function_call("acos", "acos").await;
4646 }
4647
4648 #[tokio::test]
4649 #[should_panic]
4650 async fn single_acosh() {
4651 do_single_instant_function_call("acosh", "").await;
4652 }
4653
4654 #[tokio::test]
4655 async fn single_asin() {
4656 do_single_instant_function_call("asin", "asin").await;
4657 }
4658
4659 #[tokio::test]
4660 #[should_panic]
4661 async fn single_asinh() {
4662 do_single_instant_function_call("asinh", "").await;
4663 }
4664
4665 #[tokio::test]
4666 async fn single_atan() {
4667 do_single_instant_function_call("atan", "atan").await;
4668 }
4669
4670 #[tokio::test]
4671 #[should_panic]
4672 async fn single_atanh() {
4673 do_single_instant_function_call("atanh", "").await;
4674 }
4675
4676 #[tokio::test]
4677 async fn single_cos() {
4678 do_single_instant_function_call("cos", "cos").await;
4679 }
4680
4681 #[tokio::test]
4682 #[should_panic]
4683 async fn single_cosh() {
4684 do_single_instant_function_call("cosh", "").await;
4685 }
4686
4687 #[tokio::test]
4688 async fn single_sin() {
4689 do_single_instant_function_call("sin", "sin").await;
4690 }
4691
4692 #[tokio::test]
4693 #[should_panic]
4694 async fn single_sinh() {
4695 do_single_instant_function_call("sinh", "").await;
4696 }
4697
4698 #[tokio::test]
4699 async fn single_tan() {
4700 do_single_instant_function_call("tan", "tan").await;
4701 }
4702
4703 #[tokio::test]
4704 #[should_panic]
4705 async fn single_tanh() {
4706 do_single_instant_function_call("tanh", "").await;
4707 }
4708
4709 #[tokio::test]
4710 #[should_panic]
4711 async fn single_deg() {
4712 do_single_instant_function_call("deg", "").await;
4713 }
4714
4715 #[tokio::test]
4716 #[should_panic]
4717 async fn single_rad() {
4718 do_single_instant_function_call("rad", "").await;
4719 }
4720
4721 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4743 let prom_expr = parser::parse(&format!(
4744 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4745 ))
4746 .unwrap();
4747 let mut eval_stmt = EvalStmt {
4748 expr: prom_expr,
4749 start: UNIX_EPOCH,
4750 end: UNIX_EPOCH
4751 .checked_add(Duration::from_secs(100_000))
4752 .unwrap(),
4753 interval: Duration::from_secs(5),
4754 lookback_delta: Duration::from_secs(1),
4755 };
4756
4757 let table_provider = build_test_table_provider(
4759 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4760 2,
4761 2,
4762 )
4763 .await;
4764 let plan =
4765 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4766 .await
4767 .unwrap();
4768 let expected_no_without = String::from(
4769 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4770 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4771 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4772 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4773 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4774 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4775 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4776 ).replace("TEMPLATE", plan_name);
4777 assert_eq!(
4778 plan.display_indent_schema().to_string(),
4779 expected_no_without
4780 );
4781
4782 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4784 *modifier = Some(LabelModifier::Exclude(Labels {
4785 labels: vec![String::from("tag_1")].into_iter().collect(),
4786 }));
4787 }
4788 let table_provider = build_test_table_provider(
4789 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4790 2,
4791 2,
4792 )
4793 .await;
4794 let plan =
4795 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4796 .await
4797 .unwrap();
4798 let expected_without = String::from(
4799 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4800 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4801 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4802 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4803 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4804 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4805 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4806 ).replace("TEMPLATE", plan_name);
4807 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4808 }
4809
4810 #[tokio::test]
4811 async fn aggregate_sum() {
4812 do_aggregate_expr_plan("sum", "sum").await;
4813 }
4814
4815 #[tokio::test]
4816 async fn tsid_is_used_for_series_divide_when_available() {
4817 let prom_expr = parser::parse("some_metric").unwrap();
4818 let eval_stmt = EvalStmt {
4819 expr: prom_expr,
4820 start: UNIX_EPOCH,
4821 end: UNIX_EPOCH
4822 .checked_add(Duration::from_secs(100_000))
4823 .unwrap(),
4824 interval: Duration::from_secs(5),
4825 lookback_delta: Duration::from_secs(1),
4826 };
4827
4828 let table_provider = build_test_table_provider_with_tsid(
4829 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4830 1,
4831 1,
4832 )
4833 .await;
4834 let plan =
4835 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4836 .await
4837 .unwrap();
4838
4839 let plan_str = plan.display_indent_schema().to_string();
4840 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4841 assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4842 assert!(
4843 !plan
4844 .schema()
4845 .fields()
4846 .iter()
4847 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4848 );
4849
4850 let manipulate = find_instant_manipulate(&plan).unwrap();
4851 let exec = manipulate.to_execution_plan(Arc::new(DataSourceExec::new(Arc::new(
4852 MemorySourceConfig::try_new(&[], Arc::new(ArrowSchema::empty()), None).unwrap(),
4853 ))));
4854 assert!(format!("{exec:?}").contains("reuse_tsid_column: true"));
4855 }
4856
4857 #[tokio::test]
4858 async fn default_binary_join_uses_tsid_when_available() {
4859 let eval_stmt = build_eval_stmt("some_metric / some_alt_metric");
4860
4861 let table_provider = build_test_table_provider_with_tsid(
4862 &[
4863 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4864 (
4865 DEFAULT_SCHEMA_NAME.to_string(),
4866 "some_alt_metric".to_string(),
4867 ),
4868 ],
4869 1,
4870 1,
4871 )
4872 .await;
4873 let plan =
4874 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4875 .await
4876 .unwrap();
4877
4878 let plan_str = plan.display_indent_schema().to_string();
4879 assert!(
4880 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
4881 "{plan_str}"
4882 );
4883 assert!(
4884 !plan_str.contains("some_metric.tag_0 = some_alt_metric.tag_0"),
4885 "{plan_str}"
4886 );
4887 }
4888
4889 #[tokio::test]
4890 async fn tsid_is_preserved_for_nested_default_binary_joins() {
4891 let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric");
4892
4893 let table_provider = build_test_table_provider_with_tsid(
4894 &[
4895 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4896 (
4897 DEFAULT_SCHEMA_NAME.to_string(),
4898 "some_alt_metric".to_string(),
4899 ),
4900 (
4901 DEFAULT_SCHEMA_NAME.to_string(),
4902 "some_third_metric".to_string(),
4903 ),
4904 ],
4905 1,
4906 1,
4907 )
4908 .await;
4909 let plan =
4910 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4911 .await
4912 .unwrap();
4913
4914 let plan_str = plan.display_indent_schema().to_string();
4915 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4916 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4917 }
4918
4919 #[tokio::test]
4920 async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
4921 let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100");
4922
4923 let table_provider = build_test_table_provider_with_tsid(
4924 &[
4925 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4926 (
4927 DEFAULT_SCHEMA_NAME.to_string(),
4928 "some_alt_metric".to_string(),
4929 ),
4930 ],
4931 1,
4932 1,
4933 )
4934 .await;
4935 let plan =
4936 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4937 .await
4938 .unwrap();
4939
4940 let plan_str = plan.display_indent_schema().to_string();
4941 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4942 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4943 }
4944
4945 #[tokio::test]
4946 async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
4947 let eval_stmt =
4948 build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100");
4949
4950 let table_provider = build_test_table_provider_with_tsid_fields(
4951 &[
4952 (
4953 (
4954 DEFAULT_SCHEMA_NAME.to_string(),
4955 "two_field_metric".to_string(),
4956 ),
4957 2,
4958 ),
4959 (
4960 (
4961 DEFAULT_SCHEMA_NAME.to_string(),
4962 "one_field_metric".to_string(),
4963 ),
4964 1,
4965 ),
4966 ],
4967 1,
4968 )
4969 .await;
4970 let plan =
4971 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4972 .await
4973 .unwrap();
4974
4975 let field_names = plan
4976 .schema()
4977 .fields()
4978 .iter()
4979 .map(|field| field.name().clone())
4980 .collect::<Vec<_>>();
4981 let value_columns = field_names
4982 .iter()
4983 .filter(|name| {
4984 *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
4985 })
4986 .count();
4987 assert_eq!(value_columns, 1, "{field_names:?}");
4988 let plan_str = plan.display_indent_schema().to_string();
4989 assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
4990 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
4991 }
4992
4993 #[tokio::test]
4994 async fn tsid_binary_join_uses_shorter_field_side() {
4995 let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric");
4996
4997 let table_provider = build_test_table_provider_with_tsid_fields(
4998 &[
4999 (
5000 (
5001 DEFAULT_SCHEMA_NAME.to_string(),
5002 "one_field_metric".to_string(),
5003 ),
5004 1,
5005 ),
5006 (
5007 (
5008 DEFAULT_SCHEMA_NAME.to_string(),
5009 "two_field_metric".to_string(),
5010 ),
5011 2,
5012 ),
5013 ],
5014 1,
5015 )
5016 .await;
5017 let plan =
5018 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5019 .await
5020 .unwrap();
5021
5022 let field_names = plan
5023 .schema()
5024 .fields()
5025 .iter()
5026 .map(|field| field.name().clone())
5027 .collect::<Vec<_>>();
5028 let value_columns = field_names
5029 .iter()
5030 .filter(|name| {
5031 *name != "tag_0" && *name != "timestamp" && *name != DATA_SCHEMA_TSID_COLUMN_NAME
5032 })
5033 .count();
5034 assert_eq!(value_columns, 1, "{field_names:?}");
5035 }
5036
5037 #[tokio::test]
5038 async fn label_matching_modifier_disables_tsid_binary_join() {
5039 let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric");
5040
5041 let table_provider = build_test_table_provider_with_tsid(
5042 &[
5043 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5044 (
5045 DEFAULT_SCHEMA_NAME.to_string(),
5046 "some_alt_metric".to_string(),
5047 ),
5048 ],
5049 2,
5050 1,
5051 )
5052 .await;
5053 let plan =
5054 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5055 .await
5056 .unwrap();
5057
5058 let plan_str = plan.display_indent_schema().to_string();
5059 assert!(!plan_str.contains("__tsid ="), "{plan_str}");
5060 assert!(
5061 plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"),
5062 "{plan_str}"
5063 );
5064 }
5065
5066 #[tokio::test]
5067 async fn comparison_binary_join_uses_tsid_and_keeps_it_in_filtered_result() {
5068 let eval_stmt = build_eval_stmt("some_metric > some_alt_metric");
5069
5070 let table_provider = build_test_table_provider_with_tsid(
5071 &[
5072 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5073 (
5074 DEFAULT_SCHEMA_NAME.to_string(),
5075 "some_alt_metric".to_string(),
5076 ),
5077 ],
5078 2,
5079 1,
5080 )
5081 .await;
5082 let mut planner = PromPlanner {
5083 table_provider,
5084 ctx: PromPlannerContext::from_eval_stmt(&eval_stmt),
5085 };
5086 let plan = planner
5087 .prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state())
5088 .await
5089 .unwrap();
5090
5091 let plan_str = plan.display_indent_schema().to_string();
5092 assert!(
5093 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
5094 "{plan_str}"
5095 );
5096 assert!(
5097 plan.schema()
5098 .fields()
5099 .iter()
5100 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME),
5101 "{plan_str}"
5102 );
5103 assert!(planner.ctx.use_tsid, "{plan_str}");
5104 }
5105
5106 #[tokio::test]
5107 async fn comparison_bool_binary_join_uses_tsid_when_available() {
5108 let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric");
5109
5110 let table_provider = build_test_table_provider_with_tsid(
5111 &[
5112 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5113 (
5114 DEFAULT_SCHEMA_NAME.to_string(),
5115 "some_alt_metric".to_string(),
5116 ),
5117 ],
5118 2,
5119 1,
5120 )
5121 .await;
5122 let plan =
5123 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5124 .await
5125 .unwrap();
5126
5127 let plan_str = plan.display_indent_schema().to_string();
5128 assert!(
5129 plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
5130 "{plan_str}"
5131 );
5132 assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
5133 assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
5134 }
5135
5136 #[tokio::test]
5137 async fn scalar_count_count_range_keeps_full_window() {
5138 let plan_str = build_optimized_tsid_plan(
5139 "scalar(count(count(some_metric) by (tag_0)))",
5140 1,
5141 1,
5142 100_000,
5143 1,
5144 )
5145 .await;
5146 assert!(plan_str.contains("ScalarCalculate: tags=[]"));
5147 assert!(plan_str.contains("PromInstantManipulate: range=[0..100000000]"));
5148 assert!(!plan_str.contains("PromInstantManipulate: range=[99999000..99999000]"));
5149 }
5150
5151 #[tokio::test]
5152 async fn scalar_count_count_rewrite_applies_inside_binary_expr_for_tsid_input() {
5153 let plan_str = build_optimized_tsid_plan(
5154 "sum(irate(some_metric[1h])) / scalar(count(count(some_metric) by (tag_0)))",
5155 2,
5156 1,
5157 10,
5158 300,
5159 )
5160 .await;
5161 assert!(plan_str.contains("Distinct:"), "{plan_str}");
5162 }
5163
5164 #[tokio::test]
5165 async fn nested_count_rewrite_keeps_full_series_key_with_tsid_input() {
5166 assert_nested_count_rewrite_applies(
5167 "count(count(some_metric) by (tag_0))",
5168 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(count(some_metric.field_0))]]"
5169 )
5170 .await;
5171 }
5172
5173 #[tokio::test]
5174 async fn nested_sum_count_rewrite_keeps_full_series_key_with_tsid_input() {
5175 assert_nested_count_rewrite_applies(
5176 "count(sum(some_metric) by (tag_0))",
5177 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(sum(some_metric.field_0))]]"
5178 )
5179 .await;
5180 }
5181
5182 #[tokio::test]
5183 async fn nested_supported_inner_aggs_rewrite_apply_for_tsid_input() {
5184 for (query, expected_outer_agg) in [
5185 (
5186 "count(avg(some_metric) by (tag_0))",
5187 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(avg(some_metric.field_0))]]",
5188 ),
5189 (
5190 "count(min(some_metric) by (tag_0))",
5191 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(min(some_metric.field_0))]]",
5192 ),
5193 (
5194 "count(max(some_metric) by (tag_0))",
5195 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(max(some_metric.field_0))]]",
5196 ),
5197 (
5198 "count(stddev(some_metric) by (tag_0))",
5199 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(stddev_pop(some_metric.field_0))]]",
5200 ),
5201 (
5202 "count(stdvar(some_metric) by (tag_0))",
5203 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(var_pop(some_metric.field_0))]]",
5204 ),
5205 ] {
5206 assert_nested_count_rewrite_applies(query, expected_outer_agg).await;
5207 }
5208 }
5209
5210 #[tokio::test]
5211 async fn nested_non_count_inner_aggs_rewrite_filter_null_values_for_tsid_input() {
5212 let count_plan =
5213 build_optimized_tsid_plan("count(count(some_metric) by (tag_0))", 2, 1, 100_000, 1)
5214 .await;
5215 assert!(
5216 !count_plan.contains("some_metric.field_0 IS NOT NULL"),
5217 "{count_plan}"
5218 );
5219
5220 for query in [
5221 "count(sum(some_metric) by (tag_0))",
5222 "count(avg(some_metric) by (tag_0))",
5223 "count(min(some_metric) by (tag_0))",
5224 "count(max(some_metric) by (tag_0))",
5225 "count(stddev(some_metric) by (tag_0))",
5226 "count(stdvar(some_metric) by (tag_0))",
5227 ] {
5228 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
5229 assert!(
5230 plan_str.contains("Filter: some_metric.field_0 IS NOT NULL"),
5231 "{query}: {plan_str}"
5232 );
5233 }
5234 }
5235
5236 #[tokio::test]
5237 async fn nested_unsupported_or_non_direct_inner_aggs_do_not_rewrite() {
5238 assert_nested_count_rewrite_missing("count(group(some_metric) by (tag_0))", 2, 1).await;
5239 assert_nested_count_rewrite_missing(
5240 "count(sum(irate(some_metric[1h])) by (tag_0))",
5241 2,
5242 300,
5243 )
5244 .await;
5245 }
5246
5247 #[tokio::test]
5248 async fn physical_table_name_is_not_leaked_in_plan() {
5249 let prom_expr = parser::parse("some_metric").unwrap();
5250 let eval_stmt = EvalStmt {
5251 expr: prom_expr,
5252 start: UNIX_EPOCH,
5253 end: UNIX_EPOCH
5254 .checked_add(Duration::from_secs(100_000))
5255 .unwrap(),
5256 interval: Duration::from_secs(5),
5257 lookback_delta: Duration::from_secs(1),
5258 };
5259
5260 let table_provider = build_test_table_provider_with_tsid(
5261 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5262 1,
5263 1,
5264 )
5265 .await;
5266 let plan =
5267 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5268 .await
5269 .unwrap();
5270
5271 let plan_str = plan.display_indent_schema().to_string();
5272 assert!(plan_str.contains("TableScan: phy"), "{plan}");
5273 assert!(plan_str.contains("SubqueryAlias: some_metric"));
5274 assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
5275 assert!(!plan_str.contains("TableScan: some_metric"));
5276 }
5277
5278 #[tokio::test]
5279 async fn sum_without_does_not_group_by_tsid() {
5280 let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
5281 let eval_stmt = EvalStmt {
5282 expr: prom_expr,
5283 start: UNIX_EPOCH,
5284 end: UNIX_EPOCH
5285 .checked_add(Duration::from_secs(100_000))
5286 .unwrap(),
5287 interval: Duration::from_secs(5),
5288 lookback_delta: Duration::from_secs(1),
5289 };
5290
5291 let table_provider = build_test_table_provider_with_tsid(
5292 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5293 1,
5294 1,
5295 )
5296 .await;
5297 let plan =
5298 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5299 .await
5300 .unwrap();
5301
5302 let plan_str = plan.display_indent_schema().to_string();
5303 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5304
5305 let aggr_line = plan_str
5306 .lines()
5307 .find(|line| line.contains("Aggregate: groupBy="))
5308 .unwrap();
5309 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5310 }
5311
5312 #[tokio::test]
5313 async fn topk_without_does_not_partition_by_tsid() {
5314 let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
5315 let eval_stmt = EvalStmt {
5316 expr: prom_expr,
5317 start: UNIX_EPOCH,
5318 end: UNIX_EPOCH
5319 .checked_add(Duration::from_secs(100_000))
5320 .unwrap(),
5321 interval: Duration::from_secs(5),
5322 lookback_delta: Duration::from_secs(1),
5323 };
5324
5325 let table_provider = build_test_table_provider_with_tsid(
5326 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5327 1,
5328 1,
5329 )
5330 .await;
5331 let plan =
5332 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5333 .await
5334 .unwrap();
5335
5336 let plan_str = plan.display_indent_schema().to_string();
5337 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5338
5339 let window_line = plan_str
5340 .lines()
5341 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5342 .unwrap();
5343 let partition_by = window_line
5344 .split("PARTITION BY [")
5345 .nth(1)
5346 .and_then(|s| s.split("] ORDER BY").next())
5347 .unwrap();
5348 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5349 }
5350
5351 #[tokio::test]
5352 async fn sum_by_does_not_group_by_tsid() {
5353 let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
5354 let eval_stmt = EvalStmt {
5355 expr: prom_expr,
5356 start: UNIX_EPOCH,
5357 end: UNIX_EPOCH
5358 .checked_add(Duration::from_secs(100_000))
5359 .unwrap(),
5360 interval: Duration::from_secs(5),
5361 lookback_delta: Duration::from_secs(1),
5362 };
5363
5364 let table_provider = build_test_table_provider_with_tsid(
5365 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5366 1,
5367 1,
5368 )
5369 .await;
5370 let plan =
5371 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5372 .await
5373 .unwrap();
5374
5375 let plan_str = plan.display_indent_schema().to_string();
5376 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5377
5378 let aggr_line = plan_str
5379 .lines()
5380 .find(|line| line.contains("Aggregate: groupBy="))
5381 .unwrap();
5382 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5383 }
5384
5385 #[tokio::test]
5386 async fn topk_by_does_not_partition_by_tsid() {
5387 let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
5388 let eval_stmt = EvalStmt {
5389 expr: prom_expr,
5390 start: UNIX_EPOCH,
5391 end: UNIX_EPOCH
5392 .checked_add(Duration::from_secs(100_000))
5393 .unwrap(),
5394 interval: Duration::from_secs(5),
5395 lookback_delta: Duration::from_secs(1),
5396 };
5397
5398 let table_provider = build_test_table_provider_with_tsid(
5399 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5400 1,
5401 1,
5402 )
5403 .await;
5404 let plan =
5405 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5406 .await
5407 .unwrap();
5408
5409 let plan_str = plan.display_indent_schema().to_string();
5410 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5411
5412 let window_line = plan_str
5413 .lines()
5414 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5415 .unwrap();
5416 let partition_by = window_line
5417 .split("PARTITION BY [")
5418 .nth(1)
5419 .and_then(|s| s.split("] ORDER BY").next())
5420 .unwrap();
5421 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5422 }
5423
5424 #[tokio::test]
5425 async fn selector_matcher_on_tsid_does_not_use_internal_column() {
5426 let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
5427 let eval_stmt = EvalStmt {
5428 expr: prom_expr,
5429 start: UNIX_EPOCH,
5430 end: UNIX_EPOCH
5431 .checked_add(Duration::from_secs(100_000))
5432 .unwrap(),
5433 interval: Duration::from_secs(5),
5434 lookback_delta: Duration::from_secs(1),
5435 };
5436
5437 let table_provider = build_test_table_provider_with_tsid(
5438 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5439 1,
5440 1,
5441 )
5442 .await;
5443 let plan =
5444 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5445 .await
5446 .unwrap();
5447
5448 fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
5449 if let LogicalPlan::Filter(filter) = plan {
5450 datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
5451 }
5452 for input in plan.inputs() {
5453 collect_filter_cols(input, out);
5454 }
5455 }
5456
5457 let mut filter_cols = HashSet::new();
5458 collect_filter_cols(&plan, &mut filter_cols);
5459 assert!(
5460 !filter_cols
5461 .iter()
5462 .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
5463 );
5464 }
5465
5466 #[tokio::test]
5467 async fn tsid_is_not_used_when_physical_table_is_missing() {
5468 let prom_expr = parser::parse("some_metric").unwrap();
5469 let eval_stmt = EvalStmt {
5470 expr: prom_expr,
5471 start: UNIX_EPOCH,
5472 end: UNIX_EPOCH
5473 .checked_add(Duration::from_secs(100_000))
5474 .unwrap(),
5475 interval: Duration::from_secs(5),
5476 lookback_delta: Duration::from_secs(1),
5477 };
5478
5479 let catalog_list = MemoryCatalogManager::with_default_setup();
5480
5481 let mut columns = vec![ColumnSchema::new(
5483 "tag_0".to_string(),
5484 ConcreteDataType::string_datatype(),
5485 false,
5486 )];
5487 columns.push(
5488 ColumnSchema::new(
5489 "timestamp".to_string(),
5490 ConcreteDataType::timestamp_millisecond_datatype(),
5491 false,
5492 )
5493 .with_time_index(true),
5494 );
5495 columns.push(ColumnSchema::new(
5496 "field_0".to_string(),
5497 ConcreteDataType::float64_datatype(),
5498 true,
5499 ));
5500 let schema = Arc::new(Schema::new(columns));
5501 let mut options = table::requests::TableOptions::default();
5502 options
5503 .extra_options
5504 .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
5505 let table_meta = TableMetaBuilder::empty()
5506 .schema(schema)
5507 .primary_key_indices(vec![0])
5508 .value_indices(vec![2])
5509 .engine(METRIC_ENGINE_NAME.to_string())
5510 .options(options)
5511 .next_column_id(1024)
5512 .build()
5513 .unwrap();
5514 let table_info = TableInfoBuilder::default()
5515 .table_id(1024)
5516 .name("some_metric")
5517 .meta(table_meta)
5518 .build()
5519 .unwrap();
5520 let table = EmptyTable::from_table_info(&table_info);
5521 catalog_list
5522 .register_table_sync(RegisterTableRequest {
5523 catalog: DEFAULT_CATALOG_NAME.to_string(),
5524 schema: DEFAULT_SCHEMA_NAME.to_string(),
5525 table_name: "some_metric".to_string(),
5526 table_id: 1024,
5527 table,
5528 })
5529 .unwrap();
5530
5531 let table_provider = DfTableSourceProvider::new(
5532 catalog_list,
5533 false,
5534 QueryContext::arc(),
5535 DummyDecoder::arc(),
5536 false,
5537 );
5538
5539 let plan =
5540 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5541 .await
5542 .unwrap();
5543
5544 let plan_str = plan.display_indent_schema().to_string();
5545 assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
5546 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5547 }
5548
5549 #[tokio::test]
5550 async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
5551 let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
5552 let eval_stmt = EvalStmt {
5553 expr: prom_expr,
5554 start: UNIX_EPOCH,
5555 end: UNIX_EPOCH
5556 .checked_add(Duration::from_secs(100_000))
5557 .unwrap(),
5558 interval: Duration::from_secs(5),
5559 lookback_delta: Duration::from_secs(1),
5560 };
5561
5562 let table_provider = build_test_table_provider_with_tsid(
5563 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5564 1,
5565 1,
5566 )
5567 .await;
5568 let plan =
5569 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5570 .await
5571 .unwrap();
5572
5573 let plan_str = plan.display_indent_schema().to_string();
5574 assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
5575 assert!(
5576 !plan
5577 .schema()
5578 .fields()
5579 .iter()
5580 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5581 );
5582
5583 let prom_expr = parser::parse("sum(some_metric)").unwrap();
5585 let eval_stmt = EvalStmt {
5586 expr: prom_expr,
5587 start: UNIX_EPOCH,
5588 end: UNIX_EPOCH
5589 .checked_add(Duration::from_secs(100_000))
5590 .unwrap(),
5591 interval: Duration::from_secs(5),
5592 lookback_delta: Duration::from_secs(1),
5593 };
5594 let table_provider = build_test_table_provider_with_tsid(
5595 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5596 1,
5597 1,
5598 )
5599 .await;
5600 let plan =
5601 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5602 .await
5603 .unwrap();
5604 let plan_str = plan.display_indent_schema().to_string();
5605 assert!(!plan_str.contains("first_value"));
5606 }
5607
5608 #[tokio::test]
5609 async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5610 let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5611 let eval_stmt = EvalStmt {
5612 expr: prom_expr,
5613 start: UNIX_EPOCH,
5614 end: UNIX_EPOCH
5615 .checked_add(Duration::from_secs(100_000))
5616 .unwrap(),
5617 interval: Duration::from_secs(5),
5618 lookback_delta: Duration::from_secs(1),
5619 };
5620
5621 let table_provider = build_test_table_provider_with_tsid(
5622 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5623 1,
5624 1,
5625 )
5626 .await;
5627
5628 let plan =
5629 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5630 .await
5631 .unwrap();
5632
5633 assert!(
5634 !plan
5635 .schema()
5636 .fields()
5637 .iter()
5638 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5639 );
5640 }
5641
5642 #[tokio::test]
5643 async fn aggregate_avg() {
5644 do_aggregate_expr_plan("avg", "avg").await;
5645 }
5646
5647 #[tokio::test]
5648 #[should_panic] async fn aggregate_count() {
5650 do_aggregate_expr_plan("count", "count").await;
5651 }
5652
5653 #[tokio::test]
5654 async fn aggregate_min() {
5655 do_aggregate_expr_plan("min", "min").await;
5656 }
5657
5658 #[tokio::test]
5659 async fn aggregate_max() {
5660 do_aggregate_expr_plan("max", "max").await;
5661 }
5662
5663 #[tokio::test]
5664 async fn aggregate_group() {
5665 let prom_expr = parser::parse(
5669 "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5670 )
5671 .unwrap();
5672 let eval_stmt = EvalStmt {
5673 expr: prom_expr,
5674 start: UNIX_EPOCH,
5675 end: UNIX_EPOCH
5676 .checked_add(Duration::from_secs(100_000))
5677 .unwrap(),
5678 interval: Duration::from_secs(5),
5679 lookback_delta: Duration::from_secs(1),
5680 };
5681
5682 let table_provider = build_test_table_provider_with_fields(
5683 &[(
5684 DEFAULT_SCHEMA_NAME.to_string(),
5685 "kubernetes_build_info".to_string(),
5686 )],
5687 &["cluster", "service", "job"],
5688 )
5689 .await;
5690 let plan =
5691 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5692 .await
5693 .unwrap();
5694
5695 let plan_str = plan.display_indent_schema().to_string();
5696 assert!(plan_str.contains("max(Float64(1"));
5697 }
5698
5699 #[tokio::test]
5700 async fn aggregate_stddev() {
5701 do_aggregate_expr_plan("stddev", "stddev_pop").await;
5702 }
5703
5704 #[tokio::test]
5705 async fn aggregate_stdvar() {
5706 do_aggregate_expr_plan("stdvar", "var_pop").await;
5707 }
5708
5709 #[tokio::test]
5733 async fn binary_op_column_column() {
5734 let prom_expr =
5735 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5736 let eval_stmt = EvalStmt {
5737 expr: prom_expr,
5738 start: UNIX_EPOCH,
5739 end: UNIX_EPOCH
5740 .checked_add(Duration::from_secs(100_000))
5741 .unwrap(),
5742 interval: Duration::from_secs(5),
5743 lookback_delta: Duration::from_secs(1),
5744 };
5745
5746 let table_provider = build_test_table_provider(
5747 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5748 1,
5749 1,
5750 )
5751 .await;
5752 let plan =
5753 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5754 .await
5755 .unwrap();
5756
5757 let expected = String::from(
5758 "Projection: rhs.tag_0, rhs.timestamp, CAST(lhs.field_0 AS Float64) + CAST(rhs.field_0 AS Float64) AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
5759 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5760 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5761 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5762 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5763 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5764 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5765 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5766 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5767 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5768 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5769 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5770 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5771 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5772 );
5773
5774 assert_eq!(plan.display_indent_schema().to_string(), expected);
5775 }
5776
5777 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5778 let prom_expr = parser::parse(query).unwrap();
5779 let eval_stmt = EvalStmt {
5780 expr: prom_expr,
5781 start: UNIX_EPOCH,
5782 end: UNIX_EPOCH
5783 .checked_add(Duration::from_secs(100_000))
5784 .unwrap(),
5785 interval: Duration::from_secs(5),
5786 lookback_delta: Duration::from_secs(1),
5787 };
5788
5789 let table_provider = build_test_table_provider(
5790 &[
5791 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5792 (
5793 "greptime_private".to_string(),
5794 "some_alt_metric".to_string(),
5795 ),
5796 ],
5797 1,
5798 1,
5799 )
5800 .await;
5801 let plan =
5802 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5803 .await
5804 .unwrap();
5805
5806 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5807 }
5808
5809 #[tokio::test]
5810 async fn binary_op_literal_column() {
5811 let query = r#"1 + some_metric{tag_0="bar"}"#;
5812 let expected = String::from(
5813 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + CAST(some_metric.field_0 AS Float64) AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
5814 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5815 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5816 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5817 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5818 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5819 );
5820
5821 indie_query_plan_compare(query, expected).await;
5822 }
5823
5824 #[tokio::test]
5825 async fn binary_op_literal_literal() {
5826 let query = r#"1 + 1"#;
5827 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(ms), value:Float64;N]
5828 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
5829 indie_query_plan_compare(query, expected).await;
5830 }
5831
5832 #[tokio::test]
5833 async fn simple_bool_grammar() {
5834 let query = "some_metric != bool 1.2345";
5835 let expected = String::from(
5836 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 != Float64(1.2345):Float64;N]\
5837 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5838 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5839 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5840 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5841 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5842 );
5843
5844 indie_query_plan_compare(query, expected).await;
5845 }
5846
5847 #[tokio::test]
5848 async fn bool_with_additional_arithmetic() {
5849 let query = "some_metric + (1 == bool 2)";
5850 let expected = String::from(
5851 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 AS Float64) + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
5852 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5853 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5854 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5855 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5856 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5857 );
5858
5859 indie_query_plan_compare(query, expected).await;
5860 }
5861
5862 #[tokio::test]
5863 async fn simple_unary() {
5864 let query = "-some_metric";
5865 let expected = String::from(
5866 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(ms), (- field_0):Float64;N]\
5867 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5868 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5869 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5870 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5871 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5872 );
5873
5874 indie_query_plan_compare(query, expected).await;
5875 }
5876
5877 #[tokio::test]
5878 async fn increase_aggr() {
5879 let query = "increase(some_metric[5m])";
5880 let expected = String::from(
5881 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5882 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5883 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5884 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5885 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5886 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5887 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5888 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5889 );
5890
5891 indie_query_plan_compare(query, expected).await;
5892 }
5893
5894 #[tokio::test]
5895 async fn less_filter_on_value() {
5896 let query = "some_metric < 1.2345";
5897 let expected = String::from(
5898 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5899 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5900 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5901 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5902 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5903 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5904 );
5905
5906 indie_query_plan_compare(query, expected).await;
5907 }
5908
5909 #[tokio::test]
5910 async fn count_over_time() {
5911 let query = "count_over_time(some_metric[5m])";
5912 let expected = String::from(
5913 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5914 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5915 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5916 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5917 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5918 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5919 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5920 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5921 );
5922
5923 indie_query_plan_compare(query, expected).await;
5924 }
5925
5926 #[tokio::test]
5927 async fn test_hash_join() {
5928 let mut eval_stmt = EvalStmt {
5929 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5930 start: UNIX_EPOCH,
5931 end: UNIX_EPOCH
5932 .checked_add(Duration::from_secs(100_000))
5933 .unwrap(),
5934 interval: Duration::from_secs(5),
5935 lookback_delta: Duration::from_secs(1),
5936 };
5937
5938 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5939
5940 let prom_expr = parser::parse(case).unwrap();
5941 eval_stmt.expr = prom_expr;
5942 let table_provider = build_test_table_provider_with_fields(
5943 &[
5944 (
5945 DEFAULT_SCHEMA_NAME.to_string(),
5946 "http_server_requests_seconds_sum".to_string(),
5947 ),
5948 (
5949 DEFAULT_SCHEMA_NAME.to_string(),
5950 "http_server_requests_seconds_count".to_string(),
5951 ),
5952 ],
5953 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5954 )
5955 .await;
5956 let plan =
5958 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5959 .await
5960 .unwrap();
5961 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, CAST(http_server_requests_seconds_sum.greptime_value AS Float64) / CAST(http_server_requests_seconds_count.greptime_value AS Float64) AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5962 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5963 \n SubqueryAlias: http_server_requests_seconds_sum\
5964 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5965 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5966 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5967 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5968 \n TableScan: http_server_requests_seconds_sum\
5969 \n SubqueryAlias: http_server_requests_seconds_count\
5970 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5971 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5972 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5973 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5974 \n TableScan: http_server_requests_seconds_count";
5975 assert_eq!(plan.to_string(), expected);
5976 }
5977
5978 #[tokio::test]
5979 async fn test_nested_histogram_quantile() {
5980 let mut eval_stmt = EvalStmt {
5981 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5982 start: UNIX_EPOCH,
5983 end: UNIX_EPOCH
5984 .checked_add(Duration::from_secs(100_000))
5985 .unwrap(),
5986 interval: Duration::from_secs(5),
5987 lookback_delta: Duration::from_secs(1),
5988 };
5989
5990 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5991
5992 let prom_expr = parser::parse(case).unwrap();
5993 eval_stmt.expr = prom_expr;
5994 let table_provider = build_test_table_provider_with_fields(
5995 &[(
5996 DEFAULT_SCHEMA_NAME.to_string(),
5997 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5998 )],
5999 &["pod", "le", "path", "code", "container"],
6000 )
6001 .await;
6002 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6004 .await
6005 .unwrap();
6006 }
6007
6008 #[tokio::test]
6009 async fn test_parse_and_operator() {
6010 let mut eval_stmt = EvalStmt {
6011 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6012 start: UNIX_EPOCH,
6013 end: UNIX_EPOCH
6014 .checked_add(Duration::from_secs(100_000))
6015 .unwrap(),
6016 interval: Duration::from_secs(5),
6017 lookback_delta: Duration::from_secs(1),
6018 };
6019
6020 let cases = [
6021 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
6022 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
6023 ];
6024
6025 for case in cases {
6026 let prom_expr = parser::parse(case).unwrap();
6027 eval_stmt.expr = prom_expr;
6028 let table_provider = build_test_table_provider_with_fields(
6029 &[
6030 (
6031 DEFAULT_SCHEMA_NAME.to_string(),
6032 "kubelet_volume_stats_used_bytes".to_string(),
6033 ),
6034 (
6035 DEFAULT_SCHEMA_NAME.to_string(),
6036 "kubelet_volume_stats_capacity_bytes".to_string(),
6037 ),
6038 ],
6039 &["namespace", "persistentvolumeclaim"],
6040 )
6041 .await;
6042 let _ =
6044 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6045 .await
6046 .unwrap();
6047 }
6048 }
6049
6050 #[tokio::test]
6051 async fn test_nested_binary_op() {
6052 let mut eval_stmt = EvalStmt {
6053 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6054 start: UNIX_EPOCH,
6055 end: UNIX_EPOCH
6056 .checked_add(Duration::from_secs(100_000))
6057 .unwrap(),
6058 interval: Duration::from_secs(5),
6059 lookback_delta: Duration::from_secs(1),
6060 };
6061
6062 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
6063 (
6064 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
6065 or
6066 vector(0)
6067 )"#;
6068
6069 let prom_expr = parser::parse(case).unwrap();
6070 eval_stmt.expr = prom_expr;
6071 let table_provider = build_test_table_provider_with_fields(
6072 &[(
6073 DEFAULT_SCHEMA_NAME.to_string(),
6074 "nginx_ingress_controller_requests".to_string(),
6075 )],
6076 &["namespace", "job"],
6077 )
6078 .await;
6079 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6081 .await
6082 .unwrap();
6083 }
6084
6085 #[tokio::test]
6086 async fn test_parse_or_operator() {
6087 let mut eval_stmt = EvalStmt {
6088 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6089 start: UNIX_EPOCH,
6090 end: UNIX_EPOCH
6091 .checked_add(Duration::from_secs(100_000))
6092 .unwrap(),
6093 interval: Duration::from_secs(5),
6094 lookback_delta: Duration::from_secs(1),
6095 };
6096
6097 let case = r#"
6098 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
6099 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
6100 or
6101 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
6102 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
6103
6104 let table_provider = build_test_table_provider_with_fields(
6105 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
6106 &["tenant_name", "cluster_name"],
6107 )
6108 .await;
6109 eval_stmt.expr = parser::parse(case).unwrap();
6110 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6111 .await
6112 .unwrap();
6113
6114 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6115 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
6116 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6117 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
6118 or
6119 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6120 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
6121 or
6122 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
6123 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
6124 let table_provider = build_test_table_provider_with_fields(
6125 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
6126 &["tenant_name", "cluster_name"],
6127 )
6128 .await;
6129 eval_stmt.expr = parser::parse(case).unwrap();
6130 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6131 .await
6132 .unwrap();
6133
6134 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
6135 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
6136 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
6137 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
6138 let table_provider = build_test_table_provider_with_fields(
6139 &[
6140 (
6141 DEFAULT_SCHEMA_NAME.to_string(),
6142 "background_waitevent_cnt".to_string(),
6143 ),
6144 (
6145 DEFAULT_SCHEMA_NAME.to_string(),
6146 "foreground_waitevent_cnt".to_string(),
6147 ),
6148 ],
6149 &["tenant_name", "cluster_name"],
6150 )
6151 .await;
6152 eval_stmt.expr = parser::parse(case).unwrap();
6153 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6154 .await
6155 .unwrap();
6156
6157 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
6158 let table_provider = build_test_table_provider_with_fields(
6159 &[
6160 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
6161 (
6162 DEFAULT_SCHEMA_NAME.to_string(),
6163 "container_cpu_load_average_10s".to_string(),
6164 ),
6165 (
6166 DEFAULT_SCHEMA_NAME.to_string(),
6167 "container_spec_cpu_quota".to_string(),
6168 ),
6169 ],
6170 &["cluster_name", "host_name"],
6171 )
6172 .await;
6173 eval_stmt.expr = parser::parse(case).unwrap();
6174 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6175 .await
6176 .unwrap();
6177 }
6178
6179 #[tokio::test]
6180 async fn value_matcher() {
6181 let mut eval_stmt = EvalStmt {
6183 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6184 start: UNIX_EPOCH,
6185 end: UNIX_EPOCH
6186 .checked_add(Duration::from_secs(100_000))
6187 .unwrap(),
6188 interval: Duration::from_secs(5),
6189 lookback_delta: Duration::from_secs(1),
6190 };
6191
6192 let cases = [
6193 (
6195 r#"some_metric{__field__="field_1"}"#,
6196 vec![
6197 "some_metric.field_1",
6198 "some_metric.tag_0",
6199 "some_metric.tag_1",
6200 "some_metric.tag_2",
6201 "some_metric.timestamp",
6202 ],
6203 ),
6204 (
6206 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
6207 vec![
6208 "some_metric.field_0",
6209 "some_metric.field_1",
6210 "some_metric.tag_0",
6211 "some_metric.tag_1",
6212 "some_metric.tag_2",
6213 "some_metric.timestamp",
6214 ],
6215 ),
6216 (
6218 r#"some_metric{__field__!="field_1"}"#,
6219 vec![
6220 "some_metric.field_0",
6221 "some_metric.field_2",
6222 "some_metric.tag_0",
6223 "some_metric.tag_1",
6224 "some_metric.tag_2",
6225 "some_metric.timestamp",
6226 ],
6227 ),
6228 (
6230 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
6231 vec![
6232 "some_metric.field_0",
6233 "some_metric.tag_0",
6234 "some_metric.tag_1",
6235 "some_metric.tag_2",
6236 "some_metric.timestamp",
6237 ],
6238 ),
6239 (
6241 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
6242 vec![
6243 "some_metric.field_1",
6244 "some_metric.tag_0",
6245 "some_metric.tag_1",
6246 "some_metric.tag_2",
6247 "some_metric.timestamp",
6248 ],
6249 ),
6250 (
6252 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
6253 vec![
6254 "some_metric.tag_0",
6255 "some_metric.tag_1",
6256 "some_metric.tag_2",
6257 "some_metric.timestamp",
6258 ],
6259 ),
6260 (
6262 r#"some_metric{__field__=~"field_1|field_2"}"#,
6263 vec![
6264 "some_metric.field_1",
6265 "some_metric.field_2",
6266 "some_metric.tag_0",
6267 "some_metric.tag_1",
6268 "some_metric.tag_2",
6269 "some_metric.timestamp",
6270 ],
6271 ),
6272 (
6274 r#"some_metric{__field__!~"field_1|field_2"}"#,
6275 vec![
6276 "some_metric.field_0",
6277 "some_metric.tag_0",
6278 "some_metric.tag_1",
6279 "some_metric.tag_2",
6280 "some_metric.timestamp",
6281 ],
6282 ),
6283 ];
6284
6285 for case in cases {
6286 let prom_expr = parser::parse(case.0).unwrap();
6287 eval_stmt.expr = prom_expr;
6288 let table_provider = build_test_table_provider(
6289 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6290 3,
6291 3,
6292 )
6293 .await;
6294 let plan =
6295 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6296 .await
6297 .unwrap();
6298 let mut fields = plan.schema().field_names();
6299 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
6300 fields.sort();
6301 expected.sort();
6302 assert_eq!(fields, expected, "case: {:?}", case.0);
6303 }
6304
6305 let bad_cases = [
6306 r#"some_metric{__field__="nonexistent"}"#,
6307 r#"some_metric{__field__!="nonexistent"}"#,
6308 ];
6309
6310 for case in bad_cases {
6311 let prom_expr = parser::parse(case).unwrap();
6312 eval_stmt.expr = prom_expr;
6313 let table_provider = build_test_table_provider(
6314 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6315 3,
6316 3,
6317 )
6318 .await;
6319 let plan =
6320 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6321 .await;
6322 assert!(plan.is_err(), "case: {:?}", case);
6323 }
6324 }
6325
6326 #[tokio::test]
6327 async fn custom_schema() {
6328 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
6329 let expected = String::from(
6330 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6331 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6332 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6333 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6334 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6335 );
6336
6337 indie_query_plan_compare(query, expected).await;
6338
6339 let query = "some_alt_metric{__database__=\"greptime_private\"}";
6340 let expected = String::from(
6341 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6342 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6343 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6344 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6345 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6346 );
6347
6348 indie_query_plan_compare(query, expected).await;
6349
6350 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
6351 let expected = String::from(
6352 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(greptime_private.some_alt_metric.field_0 AS Float64) / CAST(some_metric.field_0 AS Float64) AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
6353 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6354 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6355 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6356 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6357 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6358 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6359 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6360 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6361 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6362 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6363 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6364 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
6365 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
6366 );
6367
6368 indie_query_plan_compare(query, expected).await;
6369 }
6370
6371 #[tokio::test]
6372 async fn only_equals_is_supported_for_special_matcher() {
6373 let queries = &[
6374 "some_alt_metric{__schema__!=\"greptime_private\"}",
6375 "some_alt_metric{__schema__=~\"lalala\"}",
6376 "some_alt_metric{__database__!=\"greptime_private\"}",
6377 "some_alt_metric{__database__=~\"lalala\"}",
6378 ];
6379
6380 for query in queries {
6381 let prom_expr = parser::parse(query).unwrap();
6382 let eval_stmt = EvalStmt {
6383 expr: prom_expr,
6384 start: UNIX_EPOCH,
6385 end: UNIX_EPOCH
6386 .checked_add(Duration::from_secs(100_000))
6387 .unwrap(),
6388 interval: Duration::from_secs(5),
6389 lookback_delta: Duration::from_secs(1),
6390 };
6391
6392 let table_provider = build_test_table_provider(
6393 &[
6394 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
6395 (
6396 "greptime_private".to_string(),
6397 "some_alt_metric".to_string(),
6398 ),
6399 ],
6400 1,
6401 1,
6402 )
6403 .await;
6404
6405 let plan =
6406 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6407 .await;
6408 assert!(plan.is_err(), "query: {:?}", query);
6409 }
6410 }
6411
6412 #[tokio::test]
6413 async fn test_non_ms_precision() {
6414 let catalog_list = MemoryCatalogManager::with_default_setup();
6415 let columns = vec![
6416 ColumnSchema::new(
6417 "tag".to_string(),
6418 ConcreteDataType::string_datatype(),
6419 false,
6420 ),
6421 ColumnSchema::new(
6422 "timestamp".to_string(),
6423 ConcreteDataType::timestamp_nanosecond_datatype(),
6424 false,
6425 )
6426 .with_time_index(true),
6427 ColumnSchema::new(
6428 "field".to_string(),
6429 ConcreteDataType::float64_datatype(),
6430 true,
6431 ),
6432 ];
6433 let schema = Arc::new(Schema::new(columns));
6434 let table_meta = TableMetaBuilder::empty()
6435 .schema(schema)
6436 .primary_key_indices(vec![0])
6437 .value_indices(vec![2])
6438 .next_column_id(1024)
6439 .build()
6440 .unwrap();
6441 let table_info = TableInfoBuilder::default()
6442 .name("metrics".to_string())
6443 .meta(table_meta)
6444 .build()
6445 .unwrap();
6446 let table = EmptyTable::from_table_info(&table_info);
6447 assert!(
6448 catalog_list
6449 .register_table_sync(RegisterTableRequest {
6450 catalog: DEFAULT_CATALOG_NAME.to_string(),
6451 schema: DEFAULT_SCHEMA_NAME.to_string(),
6452 table_name: "metrics".to_string(),
6453 table_id: 1024,
6454 table,
6455 })
6456 .is_ok()
6457 );
6458
6459 let plan = PromPlanner::stmt_to_plan(
6460 DfTableSourceProvider::new(
6461 catalog_list.clone(),
6462 false,
6463 QueryContext::arc(),
6464 DummyDecoder::arc(),
6465 true,
6466 ),
6467 &EvalStmt {
6468 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
6469 start: UNIX_EPOCH,
6470 end: UNIX_EPOCH
6471 .checked_add(Duration::from_secs(100_000))
6472 .unwrap(),
6473 interval: Duration::from_secs(5),
6474 lookback_delta: Duration::from_secs(1),
6475 },
6476 &build_query_engine_state(),
6477 )
6478 .await
6479 .unwrap();
6480 assert_eq!(
6481 plan.display_indent_schema().to_string(),
6482 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6483 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6484 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6485 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6486 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6487 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6488 );
6489 let plan = PromPlanner::stmt_to_plan(
6490 DfTableSourceProvider::new(
6491 catalog_list.clone(),
6492 false,
6493 QueryContext::arc(),
6494 DummyDecoder::arc(),
6495 true,
6496 ),
6497 &EvalStmt {
6498 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
6499 start: UNIX_EPOCH,
6500 end: UNIX_EPOCH
6501 .checked_add(Duration::from_secs(100_000))
6502 .unwrap(),
6503 interval: Duration::from_secs(5),
6504 lookback_delta: Duration::from_secs(1),
6505 },
6506 &build_query_engine_state(),
6507 )
6508 .await
6509 .unwrap();
6510 assert_eq!(
6511 plan.display_indent_schema().to_string(),
6512 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6513 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6514 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(ms), timestamp_range:Dictionary(Int64, Timestamp(ms))]\
6515 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6516 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6517 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6518 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6519 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6520 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6521 );
6522 }
6523
6524 #[tokio::test]
6525 async fn test_nonexistent_label() {
6526 let mut eval_stmt = EvalStmt {
6528 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6529 start: UNIX_EPOCH,
6530 end: UNIX_EPOCH
6531 .checked_add(Duration::from_secs(100_000))
6532 .unwrap(),
6533 interval: Duration::from_secs(5),
6534 lookback_delta: Duration::from_secs(1),
6535 };
6536
6537 let case = r#"some_metric{nonexistent="hi"}"#;
6538 let prom_expr = parser::parse(case).unwrap();
6539 eval_stmt.expr = prom_expr;
6540 let table_provider = build_test_table_provider(
6541 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6542 3,
6543 3,
6544 )
6545 .await;
6546 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6548 .await
6549 .unwrap();
6550 }
6551
6552 #[tokio::test]
6553 async fn test_label_join() {
6554 let prom_expr = parser::parse(
6555 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
6556 )
6557 .unwrap();
6558 let eval_stmt = EvalStmt {
6559 expr: prom_expr,
6560 start: UNIX_EPOCH,
6561 end: UNIX_EPOCH
6562 .checked_add(Duration::from_secs(100_000))
6563 .unwrap(),
6564 interval: Duration::from_secs(5),
6565 lookback_delta: Duration::from_secs(1),
6566 };
6567
6568 let table_provider =
6569 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
6570 .await;
6571 let plan =
6572 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6573 .await
6574 .unwrap();
6575
6576 let expected = r#"
6577Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6578 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6579 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6580 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6581 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6582 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6583 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6584
6585 let ret = plan.display_indent_schema().to_string();
6586 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6587 }
6588
6589 #[tokio::test]
6590 async fn test_label_replace() {
6591 let prom_expr = parser::parse(
6592 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6593 )
6594 .unwrap();
6595 let eval_stmt = EvalStmt {
6596 expr: prom_expr,
6597 start: UNIX_EPOCH,
6598 end: UNIX_EPOCH
6599 .checked_add(Duration::from_secs(100_000))
6600 .unwrap(),
6601 interval: Duration::from_secs(5),
6602 lookback_delta: Duration::from_secs(1),
6603 };
6604
6605 let table_provider =
6606 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6607 .await;
6608 let plan =
6609 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6610 .await
6611 .unwrap();
6612
6613 let expected = r#"
6614Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6615 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6616 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6617 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6618 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6619 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6620 TableScan: up [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6621
6622 let ret = plan.display_indent_schema().to_string();
6623 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6624 }
6625
6626 #[tokio::test]
6627 async fn test_matchers_to_expr() {
6628 let mut eval_stmt = EvalStmt {
6629 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6630 start: UNIX_EPOCH,
6631 end: UNIX_EPOCH
6632 .checked_add(Duration::from_secs(100_000))
6633 .unwrap(),
6634 interval: Duration::from_secs(5),
6635 lookback_delta: Duration::from_secs(1),
6636 };
6637 let case =
6638 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6639
6640 let prom_expr = parser::parse(case).unwrap();
6641 eval_stmt.expr = prom_expr;
6642 let table_provider = build_test_table_provider(
6643 &[(
6644 DEFAULT_SCHEMA_NAME.to_string(),
6645 "prometheus_tsdb_head_series".to_string(),
6646 )],
6647 3,
6648 3,
6649 )
6650 .await;
6651 let plan =
6652 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6653 .await
6654 .unwrap();
6655 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6656 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6657 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6658 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6659 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6660 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6661 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6662 assert_eq!(plan.display_indent_schema().to_string(), expected);
6663 }
6664
6665 #[tokio::test]
6666 async fn test_topk_expr() {
6667 let mut eval_stmt = EvalStmt {
6668 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6669 start: UNIX_EPOCH,
6670 end: UNIX_EPOCH
6671 .checked_add(Duration::from_secs(100_000))
6672 .unwrap(),
6673 interval: Duration::from_secs(5),
6674 lookback_delta: Duration::from_secs(1),
6675 };
6676 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6677
6678 let prom_expr = parser::parse(case).unwrap();
6679 eval_stmt.expr = prom_expr;
6680 let table_provider = build_test_table_provider_with_fields(
6681 &[
6682 (
6683 DEFAULT_SCHEMA_NAME.to_string(),
6684 "prometheus_tsdb_head_series".to_string(),
6685 ),
6686 (
6687 DEFAULT_SCHEMA_NAME.to_string(),
6688 "http_server_requests_seconds_count".to_string(),
6689 ),
6690 ],
6691 &["ip"],
6692 )
6693 .await;
6694
6695 let plan =
6696 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6697 .await
6698 .unwrap();
6699 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(ms)]\
6700 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6701 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6702 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6703 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6704 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6705 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6706 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6707 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6708 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6709 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6710
6711 assert_eq!(plan.display_indent_schema().to_string(), expected);
6712 }
6713
6714 #[tokio::test]
6715 async fn test_count_values_expr() {
6716 let mut eval_stmt = EvalStmt {
6717 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6718 start: UNIX_EPOCH,
6719 end: UNIX_EPOCH
6720 .checked_add(Duration::from_secs(100_000))
6721 .unwrap(),
6722 interval: Duration::from_secs(5),
6723 lookback_delta: Duration::from_secs(1),
6724 };
6725 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6726
6727 let prom_expr = parser::parse(case).unwrap();
6728 eval_stmt.expr = prom_expr;
6729 let table_provider = build_test_table_provider_with_fields(
6730 &[
6731 (
6732 DEFAULT_SCHEMA_NAME.to_string(),
6733 "prometheus_tsdb_head_series".to_string(),
6734 ),
6735 (
6736 DEFAULT_SCHEMA_NAME.to_string(),
6737 "http_server_requests_seconds_count".to_string(),
6738 ),
6739 ],
6740 &["ip"],
6741 )
6742 .await;
6743
6744 let plan =
6745 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6746 .await
6747 .unwrap();
6748 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]\
6749 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6750 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6751 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6752 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6753 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6754 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6755 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6756 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6757
6758 assert_eq!(plan.display_indent_schema().to_string(), expected);
6759 }
6760
6761 #[tokio::test]
6762 async fn test_value_alias() {
6763 let mut eval_stmt = EvalStmt {
6764 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6765 start: UNIX_EPOCH,
6766 end: UNIX_EPOCH
6767 .checked_add(Duration::from_secs(100_000))
6768 .unwrap(),
6769 interval: Duration::from_secs(5),
6770 lookback_delta: Duration::from_secs(1),
6771 };
6772 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6773
6774 let prom_expr = parser::parse(case).unwrap();
6775 eval_stmt.expr = prom_expr;
6776 eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6777 let table_provider = build_test_table_provider_with_fields(
6778 &[
6779 (
6780 DEFAULT_SCHEMA_NAME.to_string(),
6781 "prometheus_tsdb_head_series".to_string(),
6782 ),
6783 (
6784 DEFAULT_SCHEMA_NAME.to_string(),
6785 "http_server_requests_seconds_count".to_string(),
6786 ),
6787 ],
6788 &["ip"],
6789 )
6790 .await;
6791
6792 let plan =
6793 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6794 .await
6795 .unwrap();
6796 let expected = r#"
6797Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(ms)]
6798 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]
6799 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6800 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6801 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6802 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6803 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6804 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6805 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6806 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]"#;
6807 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6808 }
6809
6810 #[tokio::test]
6811 async fn test_quantile_expr() {
6812 let mut eval_stmt = EvalStmt {
6813 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6814 start: UNIX_EPOCH,
6815 end: UNIX_EPOCH
6816 .checked_add(Duration::from_secs(100_000))
6817 .unwrap(),
6818 interval: Duration::from_secs(5),
6819 lookback_delta: Duration::from_secs(1),
6820 };
6821 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6822
6823 let prom_expr = parser::parse(case).unwrap();
6824 eval_stmt.expr = prom_expr;
6825 let table_provider = build_test_table_provider_with_fields(
6826 &[
6827 (
6828 DEFAULT_SCHEMA_NAME.to_string(),
6829 "prometheus_tsdb_head_series".to_string(),
6830 ),
6831 (
6832 DEFAULT_SCHEMA_NAME.to_string(),
6833 "http_server_requests_seconds_count".to_string(),
6834 ),
6835 ],
6836 &["ip"],
6837 )
6838 .await;
6839
6840 let plan =
6841 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6842 .await
6843 .unwrap();
6844 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6845 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6846 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6847 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6848 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6849 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6850 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6851 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6852 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6853
6854 assert_eq!(plan.display_indent_schema().to_string(), expected);
6855 }
6856
6857 #[tokio::test]
6858 async fn test_or_not_exists_table_label() {
6859 let mut eval_stmt = EvalStmt {
6860 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6861 start: UNIX_EPOCH,
6862 end: UNIX_EPOCH
6863 .checked_add(Duration::from_secs(100_000))
6864 .unwrap(),
6865 interval: Duration::from_secs(5),
6866 lookback_delta: Duration::from_secs(1),
6867 };
6868 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6869
6870 let prom_expr = parser::parse(case).unwrap();
6871 eval_stmt.expr = prom_expr;
6872 let table_provider = build_test_table_provider_with_fields(
6873 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6874 &["job"],
6875 )
6876 .await;
6877
6878 let plan =
6879 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6880 .await
6881 .unwrap();
6882 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6883 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6884 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6885 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6886 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6887 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6888 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6889 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6890 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6891 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6892 SubqueryAlias: [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6893 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6894 Sort: .time ASC NULLS LAST [time:Timestamp(ms), sum(.value):Float64;N]
6895 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(ms), sum(.value):Float64;N]
6896 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(ms), value:Float64;N]
6897 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
6898
6899 assert_eq!(plan.display_indent_schema().to_string(), expected);
6900 }
6901
6902 #[tokio::test]
6903 async fn test_histogram_quantile_missing_le_column() {
6904 let mut eval_stmt = EvalStmt {
6905 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6906 start: UNIX_EPOCH,
6907 end: UNIX_EPOCH
6908 .checked_add(Duration::from_secs(100_000))
6909 .unwrap(),
6910 interval: Duration::from_secs(5),
6911 lookback_delta: Duration::from_secs(1),
6912 };
6913
6914 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6916
6917 let prom_expr = parser::parse(case).unwrap();
6918 eval_stmt.expr = prom_expr;
6919
6920 let table_provider = build_test_table_provider_with_fields(
6922 &[(
6923 DEFAULT_SCHEMA_NAME.to_string(),
6924 "non_existent_histogram_bucket".to_string(),
6925 )],
6926 &["pod", "instance"], )
6928 .await;
6929
6930 let result =
6932 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6933 .await;
6934
6935 assert!(
6937 result.is_ok(),
6938 "Expected successful plan creation with empty result, but got error: {:?}",
6939 result.err()
6940 );
6941
6942 let plan = result.unwrap();
6944 match plan {
6945 LogicalPlan::EmptyRelation(_) => {
6946 }
6948 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6949 }
6950 }
6951}