1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51 ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, DoubleExponentialSmoothing,
63 IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65 quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74 VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80 METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85 ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86 EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96 ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100const SPECIAL_TIME_FUNCTION: &str = "time";
102const SCALAR_FUNCTION: &str = "scalar";
104const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110const LE_COLUMN_NAME: &str = "le";
112
113static LABEL_NAME_REGEX: Lazy<Regex> =
116 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120const DEFAULT_FIELD_COLUMN: &str = "value";
122
123const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130const MAX_SCATTER_POINTS: i64 = 400;
132
133const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138 start: Millisecond,
140 end: Millisecond,
141 interval: Millisecond,
142 lookback_delta: Millisecond,
143
144 table_name: Option<String>,
146 time_index_column: Option<String>,
147 field_columns: Vec<String>,
148 tag_columns: Vec<String>,
149 use_tsid: bool,
155 field_column_matcher: Option<Vec<Matcher>>,
157 selector_matcher: Vec<Matcher>,
159 schema_name: Option<String>,
160 range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166 Self {
167 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169 interval: stmt.interval.as_millis() as _,
170 lookback_delta: stmt.lookback_delta.as_millis() as _,
171 ..Default::default()
172 }
173 }
174
175 fn reset(&mut self) {
177 self.table_name = None;
178 self.time_index_column = None;
179 self.field_columns = vec![];
180 self.tag_columns = vec![];
181 self.use_tsid = false;
182 self.field_column_matcher = None;
183 self.selector_matcher.clear();
184 self.schema_name = None;
185 self.range = None;
186 }
187
188 fn reset_table_name_and_schema(&mut self) {
190 self.table_name = Some(String::new());
191 self.schema_name = None;
192 self.use_tsid = false;
193 }
194
195 fn has_le_tag(&self) -> bool {
197 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198 }
199}
200
201pub struct PromPlanner {
202 table_provider: DfTableSourceProvider,
203 ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207 pub async fn stmt_to_plan(
208 table_provider: DfTableSourceProvider,
209 stmt: &EvalStmt,
210 query_engine_state: &QueryEngineState,
211 ) -> Result<LogicalPlan> {
212 let mut planner = Self {
213 table_provider,
214 ctx: PromPlannerContext::from_eval_stmt(stmt),
215 };
216
217 let plan = planner
218 .prom_expr_to_plan(&stmt.expr, query_engine_state)
219 .await?;
220
221 planner.strip_tsid_column(plan)
223 }
224
225 pub async fn prom_expr_to_plan(
226 &mut self,
227 prom_expr: &PromExpr,
228 query_engine_state: &QueryEngineState,
229 ) -> Result<LogicalPlan> {
230 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231 .await
232 }
233
234 #[async_recursion]
244 async fn prom_expr_to_plan_inner(
245 &mut self,
246 prom_expr: &PromExpr,
247 timestamp_fn: bool,
248 query_engine_state: &QueryEngineState,
249 ) -> Result<LogicalPlan> {
250 let res = match prom_expr {
251 PromExpr::Aggregate(expr) => {
252 self.prom_aggr_expr_to_plan(query_engine_state, expr)
253 .await?
254 }
255 PromExpr::Unary(expr) => {
256 self.prom_unary_expr_to_plan(query_engine_state, expr)
257 .await?
258 }
259 PromExpr::Binary(expr) => {
260 self.prom_binary_expr_to_plan(query_engine_state, expr)
261 .await?
262 }
263 PromExpr::Paren(ParenExpr { expr }) => {
264 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265 .await?
266 }
267 PromExpr::Subquery(expr) => {
268 self.prom_subquery_expr_to_plan(query_engine_state, expr)
269 .await?
270 }
271 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273 PromExpr::VectorSelector(selector) => {
274 self.prom_vector_selector_to_plan(selector, timestamp_fn)
275 .await?
276 }
277 PromExpr::MatrixSelector(selector) => {
278 self.prom_matrix_selector_to_plan(selector).await?
279 }
280 PromExpr::Call(expr) => {
281 self.prom_call_expr_to_plan(query_engine_state, expr)
282 .await?
283 }
284 PromExpr::Extension(expr) => {
285 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286 }
287 };
288
289 Ok(res)
290 }
291
292 async fn prom_subquery_expr_to_plan(
293 &mut self,
294 query_engine_state: &QueryEngineState,
295 subquery_expr: &SubqueryExpr,
296 ) -> Result<LogicalPlan> {
297 let SubqueryExpr {
298 expr, range, step, ..
299 } = subquery_expr;
300
301 let current_interval = self.ctx.interval;
302 if let Some(step) = step {
303 self.ctx.interval = step.as_millis() as _;
304 }
305 let current_start = self.ctx.start;
306 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308 self.ctx.interval = current_interval;
309 self.ctx.start = current_start;
310
311 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312 let range_ms = range.as_millis() as _;
313 self.ctx.range = Some(range_ms);
314
315 let manipulate = RangeManipulate::new(
316 self.ctx.start,
317 self.ctx.end,
318 self.ctx.interval,
319 range_ms,
320 self.ctx
321 .time_index_column
322 .clone()
323 .expect("time index should be set in `setup_context`"),
324 self.ctx.field_columns.clone(),
325 input,
326 )
327 .context(DataFusionPlanningSnafu)?;
328
329 Ok(LogicalPlan::Extension(Extension {
330 node: Arc::new(manipulate),
331 }))
332 }
333
334 async fn prom_aggr_expr_to_plan(
335 &mut self,
336 query_engine_state: &QueryEngineState,
337 aggr_expr: &AggregateExpr,
338 ) -> Result<LogicalPlan> {
339 let AggregateExpr {
340 op,
341 expr,
342 modifier,
343 param,
344 } = aggr_expr;
345
346 let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347 let input_has_tsid = input.schema().fields().iter().any(|field| {
348 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349 && field.data_type() == &ArrowDataType::UInt64
350 });
351
352 let required_group_tags = match modifier {
355 None => BTreeSet::new(),
356 Some(LabelModifier::Include(labels)) => labels
357 .labels
358 .iter()
359 .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360 .cloned()
361 .collect(),
362 Some(LabelModifier::Exclude(labels)) => {
363 let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364 for label in &labels.labels {
365 let _ = all_tags.remove(label);
366 }
367 all_tags
368 }
369 };
370
371 if !required_group_tags.is_empty()
372 && required_group_tags
373 .iter()
374 .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375 {
376 input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377 self.refresh_tag_columns_from_schema(input.schema());
378 }
379
380 match (*op).id() {
381 token::T_TOPK | token::T_BOTTOMK => {
382 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383 }
384 _ => {
385 let input_tag_columns = if input_has_tsid {
389 self.collect_row_key_tag_columns_from_plan(&input)?
390 .into_iter()
391 .collect::<Vec<_>>()
392 } else {
393 self.ctx.tag_columns.clone()
394 };
395 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398 let (mut aggr_exprs, prev_field_exprs) =
400 self.create_aggregate_exprs(*op, param, &input)?;
401
402 let keep_tsid = op.id() != token::T_COUNT_VALUES
403 && input_has_tsid
404 && input_tag_columns.iter().collect::<HashSet<_>>()
405 == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407 if keep_tsid {
408 aggr_exprs.push(
409 first_value(
410 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411 vec![],
412 )
413 .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414 );
415 }
416 self.ctx.use_tsid = keep_tsid;
417
418 let builder = LogicalPlanBuilder::from(input);
420 let builder = if op.id() == token::T_COUNT_VALUES {
421 let label = Self::get_param_value_as_str(*op, param)?;
422 group_exprs.extend(prev_field_exprs.clone());
425 let project_fields = self
426 .create_field_column_exprs()?
427 .into_iter()
428 .chain(self.create_tag_column_exprs()?)
429 .chain(Some(self.create_time_index_column_expr()?))
430 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432 builder
433 .aggregate(group_exprs.clone(), aggr_exprs)
434 .context(DataFusionPlanningSnafu)?
435 .project(project_fields)
436 .context(DataFusionPlanningSnafu)?
437 } else {
438 builder
439 .aggregate(group_exprs.clone(), aggr_exprs)
440 .context(DataFusionPlanningSnafu)?
441 };
442
443 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445 builder
446 .sort(sort_expr)
447 .context(DataFusionPlanningSnafu)?
448 .build()
449 .context(DataFusionPlanningSnafu)
450 }
451 }
452 }
453
454 async fn prom_topk_bottomk_to_plan(
456 &mut self,
457 aggr_expr: &AggregateExpr,
458 input: LogicalPlan,
459 ) -> Result<LogicalPlan> {
460 let AggregateExpr {
461 op,
462 param,
463 modifier,
464 ..
465 } = aggr_expr;
466
467 let input_has_tsid = input.schema().fields().iter().any(|field| {
468 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469 && field.data_type() == &ArrowDataType::UInt64
470 });
471 self.ctx.use_tsid = input_has_tsid;
472
473 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480 let rank_columns: Vec<_> = window_exprs
481 .iter()
482 .map(|expr| expr.schema_name().to_string())
483 .collect();
484
485 let filter: DfExpr = rank_columns
488 .iter()
489 .fold(None, |expr, rank| {
490 let predicate = DfExpr::BinaryExpr(BinaryExpr {
491 left: Box::new(col(rank)),
492 op: Operator::LtEq,
493 right: Box::new(val.clone()),
494 });
495
496 match expr {
497 None => Some(predicate),
498 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499 left: Box::new(expr),
500 op: Operator::Or,
501 right: Box::new(predicate),
502 })),
503 }
504 })
505 .unwrap();
506
507 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509 let mut new_group_exprs = group_exprs.clone();
510 new_group_exprs.extend(rank_columns);
512
513 let group_sort_expr = new_group_exprs
514 .into_iter()
515 .map(|expr| expr.sort(true, false));
516
517 let project_fields = self
518 .create_field_column_exprs()?
519 .into_iter()
520 .chain(self.create_tag_column_exprs()?)
521 .chain(
522 self.ctx
523 .use_tsid
524 .then_some(DfExpr::Column(Column::from_name(
525 DATA_SCHEMA_TSID_COLUMN_NAME,
526 ))),
527 )
528 .chain(Some(self.create_time_index_column_expr()?));
529
530 LogicalPlanBuilder::from(input)
531 .window(window_exprs)
532 .context(DataFusionPlanningSnafu)?
533 .filter(filter)
534 .context(DataFusionPlanningSnafu)?
535 .sort(group_sort_expr)
536 .context(DataFusionPlanningSnafu)?
537 .project(project_fields)
538 .context(DataFusionPlanningSnafu)?
539 .build()
540 .context(DataFusionPlanningSnafu)
541 }
542
543 async fn prom_unary_expr_to_plan(
544 &mut self,
545 query_engine_state: &QueryEngineState,
546 unary_expr: &UnaryExpr,
547 ) -> Result<LogicalPlan> {
548 let UnaryExpr { expr } = unary_expr;
549 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
551 self.projection_for_each_field_column(input, |col| {
552 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
553 })
554 }
555
556 async fn prom_binary_expr_to_plan(
557 &mut self,
558 query_engine_state: &QueryEngineState,
559 binary_expr: &PromBinaryExpr,
560 ) -> Result<LogicalPlan> {
561 let PromBinaryExpr {
562 lhs,
563 rhs,
564 op,
565 modifier,
566 } = binary_expr;
567
568 let should_return_bool = if let Some(m) = modifier {
571 m.return_bool
572 } else {
573 false
574 };
575 let is_comparison_op = Self::is_token_a_comparison_op(*op);
576
577 match (
580 Self::try_build_literal_expr(lhs),
581 Self::try_build_literal_expr(rhs),
582 ) {
583 (Some(lhs), Some(rhs)) => {
584 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
585 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
586 self.ctx.reset_table_name_and_schema();
587 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
588 let mut field_expr = field_expr_builder(lhs, rhs)?;
589
590 if is_comparison_op && should_return_bool {
591 field_expr = DfExpr::Cast(Cast {
592 expr: Box::new(field_expr),
593 data_type: ArrowDataType::Float64,
594 });
595 }
596
597 Ok(LogicalPlan::Extension(Extension {
598 node: Arc::new(
599 EmptyMetric::new(
600 self.ctx.start,
601 self.ctx.end,
602 self.ctx.interval,
603 SPECIAL_TIME_FUNCTION.to_string(),
604 DEFAULT_FIELD_COLUMN.to_string(),
605 Some(field_expr),
606 )
607 .context(DataFusionPlanningSnafu)?,
608 ),
609 }))
610 }
611 (Some(mut expr), None) => {
613 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
614 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
616 expr = time_expr
617 }
618 let bin_expr_builder = |col: &String| {
619 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
620 let mut binary_expr =
621 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
622
623 if is_comparison_op && should_return_bool {
624 binary_expr = DfExpr::Cast(Cast {
625 expr: Box::new(binary_expr),
626 data_type: ArrowDataType::Float64,
627 });
628 }
629 Ok(binary_expr)
630 };
631 if is_comparison_op && !should_return_bool {
632 self.filter_on_field_column(input, bin_expr_builder)
633 } else {
634 self.projection_for_each_field_column(input, bin_expr_builder)
635 }
636 }
637 (None, Some(mut expr)) => {
639 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
640 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
642 expr = time_expr
643 }
644 let bin_expr_builder = |col: &String| {
645 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
646 let mut binary_expr =
647 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
648
649 if is_comparison_op && should_return_bool {
650 binary_expr = DfExpr::Cast(Cast {
651 expr: Box::new(binary_expr),
652 data_type: ArrowDataType::Float64,
653 });
654 }
655 Ok(binary_expr)
656 };
657 if is_comparison_op && !should_return_bool {
658 self.filter_on_field_column(input, bin_expr_builder)
659 } else {
660 self.projection_for_each_field_column(input, bin_expr_builder)
661 }
662 }
663 (None, None) => {
665 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
666 let left_field_columns = self.ctx.field_columns.clone();
667 let left_time_index_column = self.ctx.time_index_column.clone();
668 let mut left_table_ref = self
669 .table_ref()
670 .unwrap_or_else(|_| TableReference::bare(""));
671 let left_context = self.ctx.clone();
672
673 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
674 let right_field_columns = self.ctx.field_columns.clone();
675 let right_time_index_column = self.ctx.time_index_column.clone();
676 let mut right_table_ref = self
677 .table_ref()
678 .unwrap_or_else(|_| TableReference::bare(""));
679 let right_context = self.ctx.clone();
680
681 if Self::is_token_a_set_op(*op) {
685 return self.set_op_on_non_field_columns(
686 left_input,
687 right_input,
688 left_context,
689 right_context,
690 *op,
691 modifier,
692 );
693 }
694
695 if left_table_ref == right_table_ref {
697 left_table_ref = TableReference::bare("lhs");
699 right_table_ref = TableReference::bare("rhs");
700 if self.ctx.tag_columns.is_empty() {
706 self.ctx = left_context.clone();
707 self.ctx.table_name = Some("lhs".to_string());
708 } else {
709 self.ctx.table_name = Some("rhs".to_string());
710 }
711 }
712 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
713
714 let join_plan = self.join_on_non_field_columns(
715 left_input,
716 right_input,
717 left_table_ref.clone(),
718 right_table_ref.clone(),
719 left_time_index_column,
720 right_time_index_column,
721 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
724 modifier,
725 )?;
726 let join_plan_schema = join_plan.schema().clone();
727
728 let bin_expr_builder = |_: &String| {
729 let (left_col_name, right_col_name) = field_columns.next().unwrap();
730 let left_col = join_plan_schema
731 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
732 .context(DataFusionPlanningSnafu)?
733 .into();
734 let right_col = join_plan_schema
735 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
736 .context(DataFusionPlanningSnafu)?
737 .into();
738
739 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
740 let mut binary_expr =
741 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
742 if is_comparison_op && should_return_bool {
743 binary_expr = DfExpr::Cast(Cast {
744 expr: Box::new(binary_expr),
745 data_type: ArrowDataType::Float64,
746 });
747 }
748 Ok(binary_expr)
749 };
750 if is_comparison_op && !should_return_bool {
751 let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
758 let (project_table_ref, project_context) =
759 match (lhs.value_type(), rhs.value_type()) {
760 (ValueType::Scalar, ValueType::Vector) => {
761 (&right_table_ref, &right_context)
762 }
763 _ => (&left_table_ref, &left_context),
764 };
765 self.project_binary_join_side(filtered, project_table_ref, project_context)
766 } else {
767 self.projection_for_each_field_column(join_plan, bin_expr_builder)
768 }
769 }
770 }
771 }
772
773 fn project_binary_join_side(
774 &mut self,
775 input: LogicalPlan,
776 table_ref: &TableReference,
777 context: &PromPlannerContext,
778 ) -> Result<LogicalPlan> {
779 let schema = input.schema();
780
781 let mut project_exprs =
782 Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
783
784 if let Some(time_index_column) = &context.time_index_column {
786 let time_index_col = schema
787 .qualified_field_with_name(Some(table_ref), time_index_column)
788 .context(DataFusionPlanningSnafu)?
789 .into();
790 project_exprs.push(DfExpr::Column(time_index_col));
791 }
792
793 for field_column in &context.field_columns {
795 let field_col = schema
796 .qualified_field_with_name(Some(table_ref), field_column)
797 .context(DataFusionPlanningSnafu)?
798 .into();
799 project_exprs.push(DfExpr::Column(field_col));
800 }
801
802 for tag_column in &context.tag_columns {
804 let tag_col = schema
805 .qualified_field_with_name(Some(table_ref), tag_column)
806 .context(DataFusionPlanningSnafu)?
807 .into();
808 project_exprs.push(DfExpr::Column(tag_col));
809 }
810
811 if context.use_tsid
814 && let Ok(tsid_col) =
815 schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
816 {
817 project_exprs.push(DfExpr::Column(tsid_col.into()));
818 }
819
820 let plan = LogicalPlanBuilder::from(input)
821 .project(project_exprs)
822 .context(DataFusionPlanningSnafu)?
823 .build()
824 .context(DataFusionPlanningSnafu)?;
825
826 self.ctx = context.clone();
829 self.ctx.table_name = None;
830 self.ctx.schema_name = None;
831
832 Ok(plan)
833 }
834
835 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
836 let NumberLiteral { val } = number_literal;
837 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
838 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
839 self.ctx.reset_table_name_and_schema();
840 let literal_expr = df_prelude::lit(*val);
841
842 let plan = LogicalPlan::Extension(Extension {
843 node: Arc::new(
844 EmptyMetric::new(
845 self.ctx.start,
846 self.ctx.end,
847 self.ctx.interval,
848 SPECIAL_TIME_FUNCTION.to_string(),
849 DEFAULT_FIELD_COLUMN.to_string(),
850 Some(literal_expr),
851 )
852 .context(DataFusionPlanningSnafu)?,
853 ),
854 });
855 Ok(plan)
856 }
857
858 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
859 let StringLiteral { val } = string_literal;
860 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
861 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
862 self.ctx.reset_table_name_and_schema();
863 let literal_expr = df_prelude::lit(val.clone());
864
865 let plan = LogicalPlan::Extension(Extension {
866 node: Arc::new(
867 EmptyMetric::new(
868 self.ctx.start,
869 self.ctx.end,
870 self.ctx.interval,
871 SPECIAL_TIME_FUNCTION.to_string(),
872 DEFAULT_FIELD_COLUMN.to_string(),
873 Some(literal_expr),
874 )
875 .context(DataFusionPlanningSnafu)?,
876 ),
877 });
878 Ok(plan)
879 }
880
881 async fn prom_vector_selector_to_plan(
882 &mut self,
883 vector_selector: &VectorSelector,
884 timestamp_fn: bool,
885 ) -> Result<LogicalPlan> {
886 let VectorSelector {
887 name,
888 offset,
889 matchers,
890 at: _,
891 } = vector_selector;
892 let matchers = self.preprocess_label_matchers(matchers, name)?;
893 if let Some(empty_plan) = self.setup_context().await? {
894 return Ok(empty_plan);
895 }
896 let normalize = self
897 .selector_to_series_normalize_plan(offset, matchers, false)
898 .await?;
899
900 let normalize = if timestamp_fn {
901 self.create_timestamp_func_plan(normalize)?
904 } else {
905 normalize
906 };
907
908 let manipulate = InstantManipulate::new(
909 self.ctx.start,
910 self.ctx.end,
911 self.ctx.lookback_delta,
912 self.ctx.interval,
913 self.ctx
914 .time_index_column
915 .clone()
916 .expect("time index should be set in `setup_context`"),
917 self.ctx.field_columns.first().cloned(),
918 normalize,
919 );
920 Ok(LogicalPlan::Extension(Extension {
921 node: Arc::new(manipulate),
922 }))
923 }
924
925 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
947 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
948 .alias(DEFAULT_FIELD_COLUMN);
949 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
950 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
951 project_exprs.push(self.create_time_index_column_expr()?);
952 project_exprs.push(time_expr);
953 project_exprs.extend(self.create_tag_column_exprs()?);
954
955 LogicalPlanBuilder::from(normalize)
956 .project(project_exprs)
957 .context(DataFusionPlanningSnafu)?
958 .build()
959 .context(DataFusionPlanningSnafu)
960 }
961
962 async fn prom_matrix_selector_to_plan(
963 &mut self,
964 matrix_selector: &MatrixSelector,
965 ) -> Result<LogicalPlan> {
966 let MatrixSelector { vs, range } = matrix_selector;
967 let VectorSelector {
968 name,
969 offset,
970 matchers,
971 ..
972 } = vs;
973 let matchers = self.preprocess_label_matchers(matchers, name)?;
974 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
975 let range_ms = range.as_millis() as _;
976 self.ctx.range = Some(range_ms);
977
978 let normalize = match self.setup_context().await? {
981 Some(empty_plan) => empty_plan,
982 None => {
983 self.selector_to_series_normalize_plan(offset, matchers, true)
984 .await?
985 }
986 };
987 let manipulate = RangeManipulate::new(
988 self.ctx.start,
989 self.ctx.end,
990 self.ctx.interval,
991 range_ms,
993 self.ctx
994 .time_index_column
995 .clone()
996 .expect("time index should be set in `setup_context`"),
997 self.ctx.field_columns.clone(),
998 normalize,
999 )
1000 .context(DataFusionPlanningSnafu)?;
1001
1002 Ok(LogicalPlan::Extension(Extension {
1003 node: Arc::new(manipulate),
1004 }))
1005 }
1006
1007 async fn prom_call_expr_to_plan(
1008 &mut self,
1009 query_engine_state: &QueryEngineState,
1010 call_expr: &Call,
1011 ) -> Result<LogicalPlan> {
1012 let Call { func, args } = call_expr;
1013 match func.name {
1015 SPECIAL_HISTOGRAM_QUANTILE => {
1016 return self.create_histogram_plan(args, query_engine_state).await;
1017 }
1018 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1019 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1020 SPECIAL_ABSENT_FUNCTION => {
1021 return self.create_absent_plan(args, query_engine_state).await;
1022 }
1023 _ => {}
1024 }
1025
1026 let args = self.create_function_args(&args.args)?;
1028 let input = if let Some(prom_expr) = &args.input {
1029 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1030 .await?
1031 } else {
1032 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1033 self.ctx.reset_table_name_and_schema();
1034 self.ctx.tag_columns = vec![];
1035 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1036 LogicalPlan::Extension(Extension {
1037 node: Arc::new(
1038 EmptyMetric::new(
1039 self.ctx.start,
1040 self.ctx.end,
1041 self.ctx.interval,
1042 SPECIAL_TIME_FUNCTION.to_string(),
1043 DEFAULT_FIELD_COLUMN.to_string(),
1044 None,
1045 )
1046 .context(DataFusionPlanningSnafu)?,
1047 ),
1048 })
1049 };
1050 let (mut func_exprs, new_tags) =
1051 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1052 func_exprs.insert(0, self.create_time_index_column_expr()?);
1053 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1054
1055 let builder = LogicalPlanBuilder::from(input)
1056 .project(func_exprs)
1057 .context(DataFusionPlanningSnafu)?
1058 .filter(self.create_empty_values_filter_expr()?)
1059 .context(DataFusionPlanningSnafu)?;
1060
1061 let builder = match func.name {
1062 "sort" => builder
1063 .sort(self.create_field_columns_sort_exprs(true))
1064 .context(DataFusionPlanningSnafu)?,
1065 "sort_desc" => builder
1066 .sort(self.create_field_columns_sort_exprs(false))
1067 .context(DataFusionPlanningSnafu)?,
1068 "sort_by_label" => builder
1069 .sort(Self::create_sort_exprs_by_tags(
1070 func.name,
1071 args.literals,
1072 true,
1073 )?)
1074 .context(DataFusionPlanningSnafu)?,
1075 "sort_by_label_desc" => builder
1076 .sort(Self::create_sort_exprs_by_tags(
1077 func.name,
1078 args.literals,
1079 false,
1080 )?)
1081 .context(DataFusionPlanningSnafu)?,
1082
1083 _ => builder,
1084 };
1085
1086 for tag in new_tags {
1089 self.ctx.tag_columns.push(tag);
1090 }
1091
1092 let plan = builder.build().context(DataFusionPlanningSnafu)?;
1093 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1094
1095 Ok(plan)
1096 }
1097
1098 async fn prom_ext_expr_to_plan(
1099 &mut self,
1100 query_engine_state: &QueryEngineState,
1101 ext_expr: &promql_parser::parser::ast::Extension,
1102 ) -> Result<LogicalPlan> {
1103 let expr = &ext_expr.expr;
1105 let children = expr.children();
1106 let plan = self
1107 .prom_expr_to_plan(&children[0], query_engine_state)
1108 .await?;
1109 match expr.name() {
1115 ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1116 .explain(false, true)
1117 .unwrap()
1118 .build()
1119 .context(DataFusionPlanningSnafu),
1120 ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1121 .explain(true, true)
1122 .unwrap()
1123 .build()
1124 .context(DataFusionPlanningSnafu),
1125 EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1126 .explain(false, false)
1127 .unwrap()
1128 .build()
1129 .context(DataFusionPlanningSnafu),
1130 EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1131 .explain(true, false)
1132 .unwrap()
1133 .build()
1134 .context(DataFusionPlanningSnafu),
1135 ALIAS_NODE_NAME => {
1136 let alias = expr
1137 .as_any()
1138 .downcast_ref::<AliasExpr>()
1139 .context(UnexpectedPlanExprSnafu {
1140 desc: "Expected AliasExpr",
1141 })?
1142 .alias
1143 .clone();
1144 self.apply_alias(plan, alias)
1145 }
1146 _ => LogicalPlanBuilder::empty(true)
1147 .build()
1148 .context(DataFusionPlanningSnafu),
1149 }
1150 }
1151
1152 #[allow(clippy::mutable_key_type)]
1162 fn preprocess_label_matchers(
1163 &mut self,
1164 label_matchers: &Matchers,
1165 name: &Option<String>,
1166 ) -> Result<Matchers> {
1167 self.ctx.reset();
1168
1169 let metric_name;
1170 if let Some(name) = name.clone() {
1171 metric_name = Some(name);
1172 ensure!(
1173 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1174 MultipleMetricMatchersSnafu
1175 );
1176 } else {
1177 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1178 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1179 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1180 ensure!(
1181 matches[0].op == MatchOp::Equal,
1182 UnsupportedMatcherOpSnafu {
1183 matcher_op: matches[0].op.to_string(),
1184 matcher: METRIC_NAME
1185 }
1186 );
1187 metric_name = matches.pop().map(|m| m.value);
1188 }
1189
1190 self.ctx.table_name = metric_name;
1191
1192 let mut matchers = HashSet::new();
1193 for matcher in &label_matchers.matchers {
1194 if matcher.name == FIELD_COLUMN_MATCHER {
1196 self.ctx
1197 .field_column_matcher
1198 .get_or_insert_default()
1199 .push(matcher.clone());
1200 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1201 ensure!(
1202 matcher.op == MatchOp::Equal,
1203 UnsupportedMatcherOpSnafu {
1204 matcher: matcher.name.clone(),
1205 matcher_op: matcher.op.to_string(),
1206 }
1207 );
1208 self.ctx.schema_name = Some(matcher.value.clone());
1209 } else if matcher.name != METRIC_NAME {
1210 self.ctx.selector_matcher.push(matcher.clone());
1211 let _ = matchers.insert(matcher.clone());
1212 }
1213 }
1214
1215 Ok(Matchers::new(matchers.into_iter().collect()))
1216 }
1217
1218 async fn selector_to_series_normalize_plan(
1219 &mut self,
1220 offset: &Option<Offset>,
1221 label_matchers: Matchers,
1222 is_range_selector: bool,
1223 ) -> Result<LogicalPlan> {
1224 let table_ref = self.table_ref()?;
1226 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1227 let table_schema = table_scan.schema();
1228
1229 let offset_duration = match offset {
1231 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1232 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1233 None => 0,
1234 };
1235 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1236 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1237 scan_filters.push(time_index_filter);
1238 }
1239 table_scan = LogicalPlanBuilder::from(table_scan)
1240 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1242 .build()
1243 .context(DataFusionPlanningSnafu)?;
1244
1245 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1247 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1248 let mut result_set = HashSet::new();
1250 let mut reverse_set = HashSet::new();
1252 for matcher in field_matchers {
1253 match &matcher.op {
1254 MatchOp::Equal => {
1255 if col_set.contains(&matcher.value) {
1256 let _ = result_set.insert(matcher.value.clone());
1257 } else {
1258 return Err(ColumnNotFoundSnafu {
1259 col: matcher.value.clone(),
1260 }
1261 .build());
1262 }
1263 }
1264 MatchOp::NotEqual => {
1265 if col_set.contains(&matcher.value) {
1266 let _ = reverse_set.insert(matcher.value.clone());
1267 } else {
1268 return Err(ColumnNotFoundSnafu {
1269 col: matcher.value.clone(),
1270 }
1271 .build());
1272 }
1273 }
1274 MatchOp::Re(regex) => {
1275 for col in &self.ctx.field_columns {
1276 if regex.is_match(col) {
1277 let _ = result_set.insert(col.clone());
1278 }
1279 }
1280 }
1281 MatchOp::NotRe(regex) => {
1282 for col in &self.ctx.field_columns {
1283 if regex.is_match(col) {
1284 let _ = reverse_set.insert(col.clone());
1285 }
1286 }
1287 }
1288 }
1289 }
1290 if result_set.is_empty() {
1292 result_set = col_set.into_iter().cloned().collect();
1293 }
1294 for col in reverse_set {
1295 let _ = result_set.remove(&col);
1296 }
1297
1298 self.ctx.field_columns = self
1300 .ctx
1301 .field_columns
1302 .drain(..)
1303 .filter(|col| result_set.contains(col))
1304 .collect();
1305
1306 let exprs = result_set
1307 .into_iter()
1308 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1309 .chain(self.create_tag_column_exprs()?)
1310 .chain(
1311 self.ctx
1312 .use_tsid
1313 .then_some(DfExpr::Column(Column::new_unqualified(
1314 DATA_SCHEMA_TSID_COLUMN_NAME,
1315 ))),
1316 )
1317 .chain(Some(self.create_time_index_column_expr()?))
1318 .collect::<Vec<_>>();
1319
1320 table_scan = LogicalPlanBuilder::from(table_scan)
1322 .project(exprs)
1323 .context(DataFusionPlanningSnafu)?
1324 .build()
1325 .context(DataFusionPlanningSnafu)?;
1326 }
1327
1328 let series_key_columns = if self.ctx.use_tsid {
1330 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1331 } else {
1332 self.ctx.tag_columns.clone()
1333 };
1334
1335 let sort_exprs = if self.ctx.use_tsid {
1336 vec![
1337 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1338 self.create_time_index_column_expr()?.sort(true, true),
1339 ]
1340 } else {
1341 self.create_tag_and_time_index_column_sort_exprs()?
1342 };
1343
1344 let sort_plan = LogicalPlanBuilder::from(table_scan)
1345 .sort(sort_exprs)
1346 .context(DataFusionPlanningSnafu)?
1347 .build()
1348 .context(DataFusionPlanningSnafu)?;
1349
1350 let time_index_column =
1352 self.ctx
1353 .time_index_column
1354 .clone()
1355 .with_context(|| TimeIndexNotFoundSnafu {
1356 table: table_ref.to_string(),
1357 })?;
1358 let divide_plan = LogicalPlan::Extension(Extension {
1359 node: Arc::new(SeriesDivide::new(
1360 series_key_columns.clone(),
1361 time_index_column,
1362 sort_plan,
1363 )),
1364 });
1365
1366 if !is_range_selector && offset_duration == 0 {
1368 return Ok(divide_plan);
1369 }
1370 let series_normalize = SeriesNormalize::new(
1371 offset_duration,
1372 self.ctx
1373 .time_index_column
1374 .clone()
1375 .with_context(|| TimeIndexNotFoundSnafu {
1376 table: table_ref.to_quoted_string(),
1377 })?,
1378 is_range_selector,
1379 series_key_columns,
1380 divide_plan,
1381 );
1382 let logical_plan = LogicalPlan::Extension(Extension {
1383 node: Arc::new(series_normalize),
1384 });
1385
1386 Ok(logical_plan)
1387 }
1388
1389 fn agg_modifier_to_col(
1396 &mut self,
1397 input_schema: &DFSchemaRef,
1398 modifier: &Option<LabelModifier>,
1399 update_ctx: bool,
1400 ) -> Result<Vec<DfExpr>> {
1401 match modifier {
1402 None => {
1403 if update_ctx {
1404 self.ctx.tag_columns.clear();
1405 }
1406 Ok(vec![self.create_time_index_column_expr()?])
1407 }
1408 Some(LabelModifier::Include(labels)) => {
1409 if update_ctx {
1410 self.ctx.tag_columns.clear();
1411 }
1412 let mut exprs = Vec::with_capacity(labels.labels.len());
1413 for label in &labels.labels {
1414 if is_metric_engine_internal_column(label) {
1415 continue;
1416 }
1417 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1419 {
1420 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1421
1422 if update_ctx {
1423 self.ctx.tag_columns.push(column_name);
1425 }
1426 }
1427 }
1428 exprs.push(self.create_time_index_column_expr()?);
1430
1431 Ok(exprs)
1432 }
1433 Some(LabelModifier::Exclude(labels)) => {
1434 let mut all_fields = input_schema
1435 .fields()
1436 .iter()
1437 .map(|f| f.name())
1438 .collect::<BTreeSet<_>>();
1439
1440 all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1443
1444 for label in &labels.labels {
1447 let _ = all_fields.remove(label);
1448 }
1449
1450 if let Some(time_index) = &self.ctx.time_index_column {
1452 let _ = all_fields.remove(time_index);
1453 }
1454 for value in &self.ctx.field_columns {
1455 let _ = all_fields.remove(value);
1456 }
1457
1458 if update_ctx {
1459 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1461 }
1462
1463 let mut exprs = all_fields
1465 .into_iter()
1466 .map(|c| DfExpr::Column(Column::from(c)))
1467 .collect::<Vec<_>>();
1468
1469 exprs.push(self.create_time_index_column_expr()?);
1471
1472 Ok(exprs)
1473 }
1474 }
1475 }
1476
1477 pub fn matchers_to_expr(
1479 label_matchers: Matchers,
1480 table_schema: &DFSchemaRef,
1481 ) -> Result<Vec<DfExpr>> {
1482 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1483 for matcher in label_matchers.matchers {
1484 if matcher.name == SCHEMA_COLUMN_MATCHER
1485 || matcher.name == DB_COLUMN_MATCHER
1486 || matcher.name == FIELD_COLUMN_MATCHER
1487 {
1488 continue;
1489 }
1490
1491 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1492 let col = if let Some(column_name) = column_name {
1493 DfExpr::Column(Column::from_name(column_name))
1494 } else {
1495 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1496 .alias(matcher.name.clone())
1497 };
1498 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1499 let expr = match matcher.op {
1500 MatchOp::Equal => col.eq(lit),
1501 MatchOp::NotEqual => col.not_eq(lit),
1502 MatchOp::Re(re) => {
1503 if re.as_str() == "^(?:.*)$" {
1509 continue;
1510 }
1511 if re.as_str() == "^(?:.+)$" {
1512 col.not_eq(DfExpr::Literal(
1513 ScalarValue::Utf8(Some(String::new())),
1514 None,
1515 ))
1516 } else {
1517 DfExpr::BinaryExpr(BinaryExpr {
1518 left: Box::new(col),
1519 op: Operator::RegexMatch,
1520 right: Box::new(DfExpr::Literal(
1521 ScalarValue::Utf8(Some(re.as_str().to_string())),
1522 None,
1523 )),
1524 })
1525 }
1526 }
1527 MatchOp::NotRe(re) => {
1528 if re.as_str() == "^(?:.*)$" {
1529 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1530 } else if re.as_str() == "^(?:.+)$" {
1531 col.eq(DfExpr::Literal(
1532 ScalarValue::Utf8(Some(String::new())),
1533 None,
1534 ))
1535 } else {
1536 DfExpr::BinaryExpr(BinaryExpr {
1537 left: Box::new(col),
1538 op: Operator::RegexNotMatch,
1539 right: Box::new(DfExpr::Literal(
1540 ScalarValue::Utf8(Some(re.as_str().to_string())),
1541 None,
1542 )),
1543 })
1544 }
1545 }
1546 };
1547 exprs.push(expr);
1548 }
1549
1550 Ok(exprs)
1551 }
1552
1553 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1554 if is_metric_engine_internal_column(column) {
1555 return None;
1556 }
1557 schema
1558 .fields()
1559 .iter()
1560 .find(|field| field.name() == column)
1561 .map(|field| field.name().clone())
1562 }
1563
1564 fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1565 Ok(source
1566 .as_any()
1567 .downcast_ref::<DefaultTableSource>()
1568 .context(UnknownTableSnafu)?
1569 .table_provider
1570 .as_any()
1571 .downcast_ref::<DfTableProviderAdapter>()
1572 .context(UnknownTableSnafu)?
1573 .table())
1574 }
1575
1576 fn table_ref(&self) -> Result<TableReference> {
1577 let table_name = self
1578 .ctx
1579 .table_name
1580 .clone()
1581 .context(TableNameNotFoundSnafu)?;
1582
1583 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1585 TableReference::partial(schema_name.as_str(), table_name.as_str())
1586 } else {
1587 TableReference::bare(table_name.as_str())
1588 };
1589
1590 Ok(table_ref)
1591 }
1592
1593 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1594 let start = self.ctx.start;
1595 let end = self.ctx.end;
1596 if end < start {
1597 return InvalidTimeRangeSnafu { start, end }.fail();
1598 }
1599 let lookback_delta = self.ctx.lookback_delta;
1600 let range = self.ctx.range.unwrap_or_default();
1601 let interval = self.ctx.interval;
1602 let time_index_expr = self.create_time_index_column_expr()?;
1603 let num_points = (end - start) / interval;
1604
1605 let selector_window = if range == 0 { lookback_delta } else { range };
1613 let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1614
1615 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1617 let single_time_range = time_index_expr
1618 .clone()
1619 .gt_eq(DfExpr::Literal(
1620 ScalarValue::TimestampMillisecond(
1621 Some(
1622 self.ctx.start - offset_duration - selector_window
1623 + lower_exclusive_adjustment,
1624 ),
1625 None,
1626 ),
1627 None,
1628 ))
1629 .and(time_index_expr.lt_eq(DfExpr::Literal(
1630 ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1631 None,
1632 )));
1633 return Ok(Some(single_time_range));
1634 }
1635
1636 let mut filters = Vec::with_capacity(num_points as usize + 1);
1638 for timestamp in (start..=end).step_by(interval as usize) {
1639 filters.push(
1640 time_index_expr
1641 .clone()
1642 .gt_eq(DfExpr::Literal(
1643 ScalarValue::TimestampMillisecond(
1644 Some(
1645 timestamp - offset_duration - selector_window
1646 + lower_exclusive_adjustment,
1647 ),
1648 None,
1649 ),
1650 None,
1651 ))
1652 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1653 ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1654 None,
1655 ))),
1656 )
1657 }
1658
1659 Ok(filters.into_iter().reduce(DfExpr::or))
1660 }
1661
1662 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1667 let provider = self
1668 .table_provider
1669 .resolve_table(table_ref.clone())
1670 .await
1671 .context(CatalogSnafu)?;
1672
1673 let logical_table = self.table_from_source(&provider)?;
1674
1675 let mut maybe_phy_table_ref = table_ref.clone();
1677 let mut scan_provider = provider;
1678 let mut table_id_filter: Option<u32> = None;
1679
1680 if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1683 && let Some(physical_table_name) = logical_table
1684 .table_info()
1685 .meta
1686 .options
1687 .extra_options
1688 .get(LOGICAL_TABLE_METADATA_KEY)
1689 {
1690 let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1691 TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1692 } else {
1693 TableReference::bare(physical_table_name.as_str())
1694 };
1695
1696 let physical_provider = match self
1697 .table_provider
1698 .resolve_table(physical_table_ref.clone())
1699 .await
1700 {
1701 Ok(provider) => provider,
1702 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1703 scan_provider.clone()
1706 }
1707 Err(e) => return Err(e).context(CatalogSnafu),
1708 };
1709
1710 if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1711 let physical_table = self.table_from_source(&physical_provider)?;
1713
1714 let has_table_id = physical_table
1715 .schema()
1716 .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1717 .is_some();
1718 let has_tsid = physical_table
1719 .schema()
1720 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1721 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1722
1723 if has_table_id && has_tsid {
1724 scan_provider = physical_provider;
1725 maybe_phy_table_ref = physical_table_ref;
1726 table_id_filter = Some(logical_table.table_info().ident.table_id);
1727 }
1728 }
1729 }
1730
1731 let scan_table = self.table_from_source(&scan_provider)?;
1732
1733 let use_tsid = table_id_filter.is_some()
1734 && scan_table
1735 .schema()
1736 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1737 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1738 self.ctx.use_tsid = use_tsid;
1739
1740 let all_table_tags = self.ctx.tag_columns.clone();
1741
1742 let scan_tag_columns = if use_tsid {
1743 let mut scan_tags = self.ctx.tag_columns.clone();
1744 for matcher in &self.ctx.selector_matcher {
1745 if is_metric_engine_internal_column(&matcher.name) {
1746 continue;
1747 }
1748 if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1749 scan_tags.push(matcher.name.clone());
1750 }
1751 }
1752 scan_tags.sort_unstable();
1753 scan_tags.dedup();
1754 scan_tags
1755 } else {
1756 self.ctx.tag_columns.clone()
1757 };
1758
1759 let is_time_index_ms = scan_table
1760 .schema()
1761 .timestamp_column()
1762 .with_context(|| TimeIndexNotFoundSnafu {
1763 table: maybe_phy_table_ref.to_quoted_string(),
1764 })?
1765 .data_type
1766 == ConcreteDataType::timestamp_millisecond_datatype();
1767
1768 let scan_projection = if table_id_filter.is_some() {
1769 let mut required_columns = HashSet::new();
1770 required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1771 required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1772 TimeIndexNotFoundSnafu {
1773 table: maybe_phy_table_ref.to_quoted_string(),
1774 }
1775 })?);
1776 for col in &scan_tag_columns {
1777 required_columns.insert(col.clone());
1778 }
1779 for col in &self.ctx.field_columns {
1780 required_columns.insert(col.clone());
1781 }
1782 if use_tsid {
1783 required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1784 }
1785
1786 let arrow_schema = scan_table.schema().arrow_schema().clone();
1787 Some(
1788 arrow_schema
1789 .fields()
1790 .iter()
1791 .enumerate()
1792 .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1793 .map(|(idx, _)| idx)
1794 .collect::<Vec<_>>(),
1795 )
1796 } else {
1797 None
1798 };
1799
1800 let mut scan_plan =
1801 LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1802 .context(DataFusionPlanningSnafu)?
1803 .build()
1804 .context(DataFusionPlanningSnafu)?;
1805
1806 if let Some(table_id) = table_id_filter {
1807 scan_plan = LogicalPlanBuilder::from(scan_plan)
1808 .filter(
1809 DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1810 .eq(lit(table_id)),
1811 )
1812 .context(DataFusionPlanningSnafu)?
1813 .alias(table_ref.clone()) .context(DataFusionPlanningSnafu)?
1815 .build()
1816 .context(DataFusionPlanningSnafu)?;
1817 }
1818
1819 if !is_time_index_ms {
1820 let expr: Vec<_> = self
1822 .create_field_column_exprs()?
1823 .into_iter()
1824 .chain(
1825 scan_tag_columns
1826 .iter()
1827 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1828 )
1829 .chain(self.ctx.use_tsid.then_some(DfExpr::Column(Column::new(
1830 Some(table_ref.clone()),
1831 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1832 ))))
1833 .chain(Some(DfExpr::Alias(Alias {
1834 expr: Box::new(DfExpr::Cast(Cast {
1835 expr: Box::new(self.create_time_index_column_expr()?),
1836 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1837 })),
1838 relation: Some(table_ref.clone()),
1839 name: self
1840 .ctx
1841 .time_index_column
1842 .as_ref()
1843 .with_context(|| TimeIndexNotFoundSnafu {
1844 table: table_ref.to_quoted_string(),
1845 })?
1846 .clone(),
1847 metadata: None,
1848 })))
1849 .collect::<Vec<_>>();
1850 scan_plan = LogicalPlanBuilder::from(scan_plan)
1851 .project(expr)
1852 .context(DataFusionPlanningSnafu)?
1853 .build()
1854 .context(DataFusionPlanningSnafu)?;
1855 } else if table_id_filter.is_some() {
1856 let project_exprs = self
1858 .create_field_column_exprs()?
1859 .into_iter()
1860 .chain(
1861 scan_tag_columns
1862 .iter()
1863 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1864 )
1865 .chain(
1866 self.ctx
1867 .use_tsid
1868 .then_some(DfExpr::Column(Column::from_name(
1869 DATA_SCHEMA_TSID_COLUMN_NAME,
1870 ))),
1871 )
1872 .chain(Some(self.create_time_index_column_expr()?))
1873 .collect::<Vec<_>>();
1874
1875 scan_plan = LogicalPlanBuilder::from(scan_plan)
1876 .project(project_exprs)
1877 .context(DataFusionPlanningSnafu)?
1878 .build()
1879 .context(DataFusionPlanningSnafu)?;
1880 }
1881
1882 let result = LogicalPlanBuilder::from(scan_plan)
1883 .build()
1884 .context(DataFusionPlanningSnafu)?;
1885 Ok(result)
1886 }
1887
1888 fn collect_row_key_tag_columns_from_plan(
1889 &self,
1890 plan: &LogicalPlan,
1891 ) -> Result<BTreeSet<String>> {
1892 fn walk(
1893 planner: &PromPlanner,
1894 plan: &LogicalPlan,
1895 out: &mut BTreeSet<String>,
1896 ) -> Result<()> {
1897 if let LogicalPlan::TableScan(scan) = plan {
1898 let table = planner.table_from_source(&scan.source)?;
1899 for col in table.table_info().meta.row_key_column_names() {
1900 if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1901 && col != DATA_SCHEMA_TSID_COLUMN_NAME
1902 && !is_metric_engine_internal_column(col)
1903 {
1904 out.insert(col.clone());
1905 }
1906 }
1907 }
1908
1909 for input in plan.inputs() {
1910 walk(planner, input, out)?;
1911 }
1912 Ok(())
1913 }
1914
1915 let mut out = BTreeSet::new();
1916 walk(self, plan, &mut out)?;
1917 Ok(out)
1918 }
1919
1920 fn ensure_tag_columns_available(
1921 &self,
1922 plan: LogicalPlan,
1923 required_tags: &BTreeSet<String>,
1924 ) -> Result<LogicalPlan> {
1925 if required_tags.is_empty() {
1926 return Ok(plan);
1927 }
1928
1929 struct Rewriter {
1930 required_tags: BTreeSet<String>,
1931 }
1932
1933 impl TreeNodeRewriter for Rewriter {
1934 type Node = LogicalPlan;
1935
1936 fn f_up(
1937 &mut self,
1938 node: Self::Node,
1939 ) -> datafusion_common::Result<Transformed<Self::Node>> {
1940 match node {
1941 LogicalPlan::TableScan(scan) => {
1942 let schema = scan.source.schema();
1943 let mut projection = match scan.projection.clone() {
1944 Some(p) => p,
1945 None => {
1946 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1948 }
1949 };
1950
1951 let mut changed = false;
1952 for tag in &self.required_tags {
1953 if let Some((idx, _)) = schema
1954 .fields()
1955 .iter()
1956 .enumerate()
1957 .find(|(_, field)| field.name() == tag)
1958 && !projection.contains(&idx)
1959 {
1960 projection.push(idx);
1961 changed = true;
1962 }
1963 }
1964
1965 if !changed {
1966 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1967 }
1968
1969 projection.sort_unstable();
1970 projection.dedup();
1971
1972 let new_scan = TableScan::try_new(
1973 scan.table_name.clone(),
1974 scan.source.clone(),
1975 Some(projection),
1976 scan.filters,
1977 scan.fetch,
1978 )?;
1979 Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1980 }
1981 LogicalPlan::Projection(proj) => {
1982 let input_schema = proj.input.schema();
1983
1984 let existing = proj
1985 .schema
1986 .fields()
1987 .iter()
1988 .map(|f| f.name().as_str())
1989 .collect::<HashSet<_>>();
1990
1991 let mut expr = proj.expr.clone();
1992 let mut has_changed = false;
1993 for tag in &self.required_tags {
1994 if existing.contains(tag.as_str()) {
1995 continue;
1996 }
1997
1998 if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
1999 expr.push(DfExpr::Column(Column::from(
2000 input_schema.qualified_field(idx),
2001 )));
2002 has_changed = true;
2003 }
2004 }
2005
2006 if !has_changed {
2007 return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2008 }
2009
2010 let new_proj = Projection::try_new(expr, proj.input)?;
2011 Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2012 }
2013 other => Ok(Transformed::no(other)),
2014 }
2015 }
2016 }
2017
2018 let mut rewriter = Rewriter {
2019 required_tags: required_tags.clone(),
2020 };
2021 let rewritten = plan
2022 .rewrite(&mut rewriter)
2023 .context(DataFusionPlanningSnafu)?;
2024 Ok(rewritten.data)
2025 }
2026
2027 fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2028 let time_index = self.ctx.time_index_column.as_deref();
2029 let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2030
2031 let mut tags = schema
2032 .fields()
2033 .iter()
2034 .map(|f| f.name())
2035 .filter(|name| Some(name.as_str()) != time_index)
2036 .filter(|name| !field_columns.contains(name))
2037 .filter(|name| !is_metric_engine_internal_column(name))
2038 .cloned()
2039 .collect::<Vec<_>>();
2040 tags.sort_unstable();
2041 tags.dedup();
2042 self.ctx.tag_columns = tags;
2043 }
2044
2045 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2049 let table_ref = self.table_ref()?;
2050 let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2051 Err(e) if e.status_code() == StatusCode::TableNotFound => {
2052 let plan = self.setup_context_for_empty_metric()?;
2053 return Ok(Some(plan));
2054 }
2055 res => res.context(CatalogSnafu)?,
2056 };
2057 let table = self.table_from_source(&source)?;
2058
2059 let time_index = table
2061 .schema()
2062 .timestamp_column()
2063 .with_context(|| TimeIndexNotFoundSnafu {
2064 table: table_ref.to_quoted_string(),
2065 })?
2066 .name
2067 .clone();
2068 self.ctx.time_index_column = Some(time_index);
2069
2070 let values = table
2072 .table_info()
2073 .meta
2074 .field_column_names()
2075 .cloned()
2076 .collect();
2077 self.ctx.field_columns = values;
2078
2079 let tags = table
2081 .table_info()
2082 .meta
2083 .row_key_column_names()
2084 .filter(|col| {
2085 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2087 })
2088 .cloned()
2089 .collect();
2090 self.ctx.tag_columns = tags;
2091
2092 self.ctx.use_tsid = false;
2093
2094 Ok(None)
2095 }
2096
2097 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2100 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2101 self.ctx.reset_table_name_and_schema();
2102 self.ctx.tag_columns = vec![];
2103 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2104 self.ctx.use_tsid = false;
2105
2106 let plan = LogicalPlan::Extension(Extension {
2108 node: Arc::new(
2109 EmptyMetric::new(
2110 0,
2111 -1,
2112 self.ctx.interval,
2113 SPECIAL_TIME_FUNCTION.to_string(),
2114 DEFAULT_FIELD_COLUMN.to_string(),
2115 Some(lit(0.0f64)),
2116 )
2117 .context(DataFusionPlanningSnafu)?,
2118 ),
2119 });
2120 Ok(plan)
2121 }
2122
2123 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2125 let mut result = FunctionArgs::default();
2126
2127 for arg in args {
2128 if let Some(expr) = Self::try_build_literal_expr(arg) {
2130 result.literals.push(expr);
2131 } else {
2132 match arg.as_ref() {
2134 PromExpr::Subquery(_)
2135 | PromExpr::VectorSelector(_)
2136 | PromExpr::MatrixSelector(_)
2137 | PromExpr::Extension(_)
2138 | PromExpr::Aggregate(_)
2139 | PromExpr::Paren(_)
2140 | PromExpr::Call(_)
2141 | PromExpr::Binary(_)
2142 | PromExpr::Unary(_) => {
2143 if result.input.replace(*arg.clone()).is_some() {
2144 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2145 }
2146 }
2147
2148 _ => {
2149 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2150 result.literals.push(expr);
2151 }
2152 }
2153 }
2154 }
2155
2156 Ok(result)
2157 }
2158
2159 fn create_function_expr(
2165 &mut self,
2166 func: &Function,
2167 other_input_exprs: Vec<DfExpr>,
2168 query_engine_state: &QueryEngineState,
2169 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2170 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2172
2173 let field_column_pos = 0;
2175 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2176 let mut new_tags = vec![];
2178 let scalar_func = match func.name {
2179 "increase" => ScalarFunc::ExtrapolateUdf(
2180 Arc::new(Increase::scalar_udf()),
2181 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2182 ),
2183 "rate" => ScalarFunc::ExtrapolateUdf(
2184 Arc::new(Rate::scalar_udf()),
2185 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2186 ),
2187 "delta" => ScalarFunc::ExtrapolateUdf(
2188 Arc::new(Delta::scalar_udf()),
2189 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2190 ),
2191 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2192 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2193 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2194 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2195 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2196 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2197 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2198 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2199 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2200 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2201 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2202 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2203 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2204 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2205 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2206 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2207 "predict_linear" => {
2208 other_input_exprs[0] = DfExpr::Cast(Cast {
2209 expr: Box::new(other_input_exprs[0].clone()),
2210 data_type: ArrowDataType::Int64,
2211 });
2212 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2213 }
2214 "double_exponential_smoothing" | "holt_winters" => {
2215 ScalarFunc::Udf(Arc::new(DoubleExponentialSmoothing::scalar_udf()))
2216 }
2217 "time" => {
2218 exprs.push(build_special_time_expr(
2219 self.ctx.time_index_column.as_ref().unwrap(),
2220 ));
2221 ScalarFunc::GeneratedExpr
2222 }
2223 "minute" => {
2224 let expr = self.date_part_on_time_index("minute")?;
2226 exprs.push(expr);
2227 ScalarFunc::GeneratedExpr
2228 }
2229 "hour" => {
2230 let expr = self.date_part_on_time_index("hour")?;
2232 exprs.push(expr);
2233 ScalarFunc::GeneratedExpr
2234 }
2235 "month" => {
2236 let expr = self.date_part_on_time_index("month")?;
2238 exprs.push(expr);
2239 ScalarFunc::GeneratedExpr
2240 }
2241 "year" => {
2242 let expr = self.date_part_on_time_index("year")?;
2244 exprs.push(expr);
2245 ScalarFunc::GeneratedExpr
2246 }
2247 "day_of_month" => {
2248 let expr = self.date_part_on_time_index("day")?;
2250 exprs.push(expr);
2251 ScalarFunc::GeneratedExpr
2252 }
2253 "day_of_week" => {
2254 let expr = self.date_part_on_time_index("dow")?;
2256 exprs.push(expr);
2257 ScalarFunc::GeneratedExpr
2258 }
2259 "day_of_year" => {
2260 let expr = self.date_part_on_time_index("doy")?;
2262 exprs.push(expr);
2263 ScalarFunc::GeneratedExpr
2264 }
2265 "days_in_month" => {
2266 let day_lit_expr = "day".lit();
2271 let month_lit_expr = "month".lit();
2272 let interval_1month_lit_expr =
2273 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2274 let interval_1day_lit_expr = DfExpr::Literal(
2275 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2276 None,
2277 );
2278 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2279 left: Box::new(interval_1month_lit_expr),
2280 op: Operator::Minus,
2281 right: Box::new(interval_1day_lit_expr),
2282 });
2283 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2284 func: datafusion_functions::datetime::date_trunc(),
2285 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2286 });
2287 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2288 left: Box::new(date_trunc_expr),
2289 op: Operator::Plus,
2290 right: Box::new(the_1month_minus_1day_expr),
2291 });
2292 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2293 func: datafusion_functions::datetime::date_part(),
2294 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2295 });
2296
2297 exprs.push(date_part_expr);
2298 ScalarFunc::GeneratedExpr
2299 }
2300
2301 "label_join" => {
2302 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2303 &mut other_input_exprs,
2304 &self.ctx,
2305 query_engine_state,
2306 )?;
2307
2308 for value in &self.ctx.field_columns {
2310 if *value != dst_label {
2311 let expr = DfExpr::Column(Column::from_name(value));
2312 exprs.push(expr);
2313 }
2314 }
2315
2316 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2318 new_tags.push(dst_label);
2319 exprs.push(concat_expr);
2321
2322 ScalarFunc::GeneratedExpr
2323 }
2324 "label_replace" => {
2325 if let Some((replace_expr, dst_label)) = self
2326 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2327 {
2328 for value in &self.ctx.field_columns {
2330 if *value != dst_label {
2331 let expr = DfExpr::Column(Column::from_name(value));
2332 exprs.push(expr);
2333 }
2334 }
2335
2336 ensure!(
2337 !self.ctx.tag_columns.contains(&dst_label),
2338 SameLabelSetSnafu
2339 );
2340 new_tags.push(dst_label);
2341 exprs.push(replace_expr);
2343 } else {
2344 for value in &self.ctx.field_columns {
2346 let expr = DfExpr::Column(Column::from_name(value));
2347 exprs.push(expr);
2348 }
2349 }
2350
2351 ScalarFunc::GeneratedExpr
2352 }
2353 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2354 for value in &self.ctx.field_columns {
2357 let expr = DfExpr::Column(Column::from_name(value));
2358 exprs.push(expr);
2359 }
2360
2361 ScalarFunc::GeneratedExpr
2362 }
2363 "round" => {
2364 if other_input_exprs.is_empty() {
2365 other_input_exprs.push_front(0.0f64.lit());
2366 }
2367 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2368 }
2369 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2370 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2371 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2372 "pi" => {
2373 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2375 func: datafusion::functions::math::pi(),
2376 args: vec![],
2377 });
2378 exprs.push(fn_expr);
2379
2380 ScalarFunc::GeneratedExpr
2381 }
2382 _ => {
2383 if let Some(f) = query_engine_state
2384 .session_state()
2385 .scalar_functions()
2386 .get(func.name)
2387 {
2388 ScalarFunc::DataFusionBuiltin(f.clone())
2389 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2390 let func_state = query_engine_state.function_state();
2391 let query_ctx = self.table_provider.query_ctx();
2392
2393 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2394 state: func_state,
2395 query_ctx: query_ctx.clone(),
2396 })))
2397 } else if let Some(f) = datafusion_functions::math::functions()
2398 .iter()
2399 .find(|f| f.name() == func.name)
2400 {
2401 ScalarFunc::DataFusionUdf(f.clone())
2402 } else {
2403 return UnsupportedExprSnafu {
2404 name: func.name.to_string(),
2405 }
2406 .fail();
2407 }
2408 }
2409 };
2410
2411 for value in &self.ctx.field_columns {
2412 let col_expr = DfExpr::Column(Column::from_name(value));
2413
2414 match scalar_func.clone() {
2415 ScalarFunc::DataFusionBuiltin(func) => {
2416 other_input_exprs.insert(field_column_pos, col_expr);
2417 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2418 func,
2419 args: other_input_exprs.clone().into(),
2420 });
2421 exprs.push(fn_expr);
2422 let _ = other_input_exprs.remove(field_column_pos);
2423 }
2424 ScalarFunc::DataFusionUdf(func) => {
2425 let args = itertools::chain!(
2426 other_input_exprs.iter().take(field_column_pos).cloned(),
2427 std::iter::once(col_expr),
2428 other_input_exprs.iter().skip(field_column_pos).cloned()
2429 )
2430 .collect_vec();
2431 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2432 }
2433 ScalarFunc::Udf(func) => {
2434 let ts_range_expr = DfExpr::Column(Column::from_name(
2435 RangeManipulate::build_timestamp_range_name(
2436 self.ctx.time_index_column.as_ref().unwrap(),
2437 ),
2438 ));
2439 other_input_exprs.insert(field_column_pos, ts_range_expr);
2440 other_input_exprs.insert(field_column_pos + 1, col_expr);
2441 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2442 func,
2443 args: other_input_exprs.clone().into(),
2444 });
2445 exprs.push(fn_expr);
2446 let _ = other_input_exprs.remove(field_column_pos + 1);
2447 let _ = other_input_exprs.remove(field_column_pos);
2448 }
2449 ScalarFunc::ExtrapolateUdf(func, range_length) => {
2450 let ts_range_expr = DfExpr::Column(Column::from_name(
2451 RangeManipulate::build_timestamp_range_name(
2452 self.ctx.time_index_column.as_ref().unwrap(),
2453 ),
2454 ));
2455 other_input_exprs.insert(field_column_pos, ts_range_expr);
2456 other_input_exprs.insert(field_column_pos + 1, col_expr);
2457 other_input_exprs
2458 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2459 other_input_exprs.push_back(lit(range_length));
2460 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2461 func,
2462 args: other_input_exprs.clone().into(),
2463 });
2464 exprs.push(fn_expr);
2465 let _ = other_input_exprs.pop_back();
2466 let _ = other_input_exprs.remove(field_column_pos + 2);
2467 let _ = other_input_exprs.remove(field_column_pos + 1);
2468 let _ = other_input_exprs.remove(field_column_pos);
2469 }
2470 ScalarFunc::GeneratedExpr => {}
2471 }
2472 }
2473
2474 if !matches!(func.name, "label_join" | "label_replace") {
2478 let mut new_field_columns = Vec::with_capacity(exprs.len());
2479
2480 exprs = exprs
2481 .into_iter()
2482 .map(|expr| {
2483 let display_name = expr.schema_name().to_string();
2484 new_field_columns.push(display_name.clone());
2485 Ok(expr.alias(display_name))
2486 })
2487 .collect::<std::result::Result<Vec<_>, _>>()
2488 .context(DataFusionPlanningSnafu)?;
2489
2490 self.ctx.field_columns = new_field_columns;
2491 }
2492
2493 Ok((exprs, new_tags))
2494 }
2495
2496 fn validate_label_name(label_name: &str) -> Result<()> {
2500 if label_name.starts_with("__") {
2502 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2503 }
2504 if !LABEL_NAME_REGEX.is_match(label_name) {
2506 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2507 }
2508
2509 Ok(())
2510 }
2511
2512 fn build_regexp_replace_label_expr(
2514 &self,
2515 other_input_exprs: &mut VecDeque<DfExpr>,
2516 query_engine_state: &QueryEngineState,
2517 ) -> Result<Option<(DfExpr, String)>> {
2518 let dst_label = match other_input_exprs.pop_front() {
2520 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2521 other => UnexpectedPlanExprSnafu {
2522 desc: format!("expected dst_label string literal, but found {:?}", other),
2523 }
2524 .fail()?,
2525 };
2526
2527 Self::validate_label_name(&dst_label)?;
2529 let replacement = match other_input_exprs.pop_front() {
2530 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2531 other => UnexpectedPlanExprSnafu {
2532 desc: format!("expected replacement string literal, but found {:?}", other),
2533 }
2534 .fail()?,
2535 };
2536 let src_label = match other_input_exprs.pop_front() {
2537 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2538 other => UnexpectedPlanExprSnafu {
2539 desc: format!("expected src_label string literal, but found {:?}", other),
2540 }
2541 .fail()?,
2542 };
2543
2544 let regex = match other_input_exprs.pop_front() {
2545 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2546 other => UnexpectedPlanExprSnafu {
2547 desc: format!("expected regex string literal, but found {:?}", other),
2548 }
2549 .fail()?,
2550 };
2551
2552 regex::Regex::new(®ex).map_err(|_| {
2555 InvalidRegularExpressionSnafu {
2556 regex: regex.clone(),
2557 }
2558 .build()
2559 })?;
2560
2561 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2563 return Ok(None);
2564 }
2565
2566 if !self.ctx.tag_columns.contains(&src_label) {
2568 if replacement.is_empty() {
2569 return Ok(None);
2571 } else {
2572 return Ok(Some((
2574 lit(replacement).alias(&dst_label),
2576 dst_label,
2577 )));
2578 }
2579 }
2580
2581 let regex = format!("^(?s:{regex})$");
2584
2585 let session_state = query_engine_state.session_state();
2586 let func = session_state
2587 .scalar_functions()
2588 .get("regexp_replace")
2589 .context(UnsupportedExprSnafu {
2590 name: "regexp_replace",
2591 })?;
2592
2593 let args = vec![
2595 if src_label.is_empty() {
2596 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2597 } else {
2598 DfExpr::Column(Column::from_name(src_label))
2599 },
2600 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2601 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2602 ];
2603
2604 Ok(Some((
2605 DfExpr::ScalarFunction(ScalarFunction {
2606 func: func.clone(),
2607 args,
2608 })
2609 .alias(&dst_label),
2610 dst_label,
2611 )))
2612 }
2613
2614 fn build_concat_labels_expr(
2616 other_input_exprs: &mut VecDeque<DfExpr>,
2617 ctx: &PromPlannerContext,
2618 query_engine_state: &QueryEngineState,
2619 ) -> Result<(DfExpr, String)> {
2620 let dst_label = match other_input_exprs.pop_front() {
2623 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2624 other => UnexpectedPlanExprSnafu {
2625 desc: format!("expected dst_label string literal, but found {:?}", other),
2626 }
2627 .fail()?,
2628 };
2629 let separator = match other_input_exprs.pop_front() {
2630 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2631 other => UnexpectedPlanExprSnafu {
2632 desc: format!("expected separator string literal, but found {:?}", other),
2633 }
2634 .fail()?,
2635 };
2636
2637 let available_columns: HashSet<&str> = ctx
2639 .tag_columns
2640 .iter()
2641 .chain(ctx.field_columns.iter())
2642 .chain(ctx.time_index_column.as_ref())
2643 .map(|s| s.as_str())
2644 .collect();
2645
2646 let src_labels = other_input_exprs
2647 .iter()
2648 .map(|expr| {
2649 match expr {
2651 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2652 if label.is_empty() {
2653 Ok(DfExpr::Literal(ScalarValue::Null, None))
2654 } else if available_columns.contains(label.as_str()) {
2655 Ok(DfExpr::Column(Column::from_name(label)))
2657 } else {
2658 Ok(DfExpr::Literal(ScalarValue::Null, None))
2660 }
2661 }
2662 other => UnexpectedPlanExprSnafu {
2663 desc: format!(
2664 "expected source label string literal, but found {:?}",
2665 other
2666 ),
2667 }
2668 .fail(),
2669 }
2670 })
2671 .collect::<Result<Vec<_>>>()?;
2672 ensure!(
2673 !src_labels.is_empty(),
2674 FunctionInvalidArgumentSnafu {
2675 fn_name: "label_join"
2676 }
2677 );
2678
2679 let session_state = query_engine_state.session_state();
2680 let func = session_state
2681 .scalar_functions()
2682 .get("concat_ws")
2683 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2684
2685 let mut args = Vec::with_capacity(1 + src_labels.len());
2687 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2688 args.extend(src_labels);
2689
2690 Ok((
2691 DfExpr::ScalarFunction(ScalarFunction {
2692 func: func.clone(),
2693 args,
2694 })
2695 .alias(&dst_label),
2696 dst_label,
2697 ))
2698 }
2699
2700 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2701 Ok(DfExpr::Column(Column::from_name(
2702 self.ctx
2703 .time_index_column
2704 .clone()
2705 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2706 )))
2707 }
2708
2709 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2710 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2711 for tag in &self.ctx.tag_columns {
2712 let expr = DfExpr::Column(Column::from_name(tag));
2713 result.push(expr);
2714 }
2715 Ok(result)
2716 }
2717
2718 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2719 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2720 for field in &self.ctx.field_columns {
2721 let expr = DfExpr::Column(Column::from_name(field));
2722 result.push(expr);
2723 }
2724 Ok(result)
2725 }
2726
2727 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2728 let mut result = self
2729 .ctx
2730 .tag_columns
2731 .iter()
2732 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2733 .collect::<Vec<_>>();
2734 result.push(self.create_time_index_column_expr()?.sort(true, true));
2735 Ok(result)
2736 }
2737
2738 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2739 self.ctx
2740 .field_columns
2741 .iter()
2742 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2743 .collect::<Vec<_>>()
2744 }
2745
2746 fn create_sort_exprs_by_tags(
2747 func: &str,
2748 tags: Vec<DfExpr>,
2749 asc: bool,
2750 ) -> Result<Vec<SortExpr>> {
2751 ensure!(
2752 !tags.is_empty(),
2753 FunctionInvalidArgumentSnafu { fn_name: func }
2754 );
2755
2756 tags.iter()
2757 .map(|col| match col {
2758 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2759 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2760 }
2761 other => UnexpectedPlanExprSnafu {
2762 desc: format!("expected label string literal, but found {:?}", other),
2763 }
2764 .fail(),
2765 })
2766 .collect::<Result<Vec<_>>>()
2767 }
2768
2769 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2770 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2771 for value in &self.ctx.field_columns {
2772 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2773 exprs.push(expr);
2774 }
2775
2776 conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2781 table: self
2782 .table_ref()
2783 .map(|t| t.to_quoted_string())
2784 .unwrap_or_else(|_| "unknown".to_string()),
2785 })
2786 }
2787
2788 fn create_aggregate_exprs(
2804 &mut self,
2805 op: TokenType,
2806 param: &Option<Box<PromExpr>>,
2807 input_plan: &LogicalPlan,
2808 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2809 let mut non_col_args = Vec::new();
2810 let is_group_agg = op.id() == token::T_GROUP;
2811 if is_group_agg {
2812 ensure!(
2813 self.ctx.field_columns.len() == 1,
2814 MultiFieldsNotSupportedSnafu {
2815 operator: "group()"
2816 }
2817 );
2818 }
2819 let aggr = match op.id() {
2820 token::T_SUM => sum_udaf(),
2821 token::T_QUANTILE => {
2822 let q =
2823 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2824 non_col_args.push(q);
2825 quantile_udaf()
2826 }
2827 token::T_AVG => avg_udaf(),
2828 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2829 token::T_MIN => min_udaf(),
2830 token::T_MAX => max_udaf(),
2831 token::T_GROUP => max_udaf(),
2834 token::T_STDDEV => stddev_pop_udaf(),
2835 token::T_STDVAR => var_pop_udaf(),
2836 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2837 name: format!("{op:?}"),
2838 }
2839 .fail()?,
2840 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2841 };
2842
2843 let exprs: Vec<DfExpr> = self
2845 .ctx
2846 .field_columns
2847 .iter()
2848 .map(|col| {
2849 if is_group_agg {
2850 aggr.call(vec![lit(1_f64)])
2851 } else {
2852 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2853 let expr = aggr.call(non_col_args.clone());
2854 non_col_args.pop();
2855 expr
2856 }
2857 })
2858 .collect::<Vec<_>>();
2859
2860 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2862 let prev_field_exprs: Vec<_> = self
2863 .ctx
2864 .field_columns
2865 .iter()
2866 .map(|col| DfExpr::Column(Column::from_name(col)))
2867 .collect();
2868
2869 ensure!(
2870 self.ctx.field_columns.len() == 1,
2871 UnsupportedExprSnafu {
2872 name: "count_values on multi-value input"
2873 }
2874 );
2875
2876 prev_field_exprs
2877 } else {
2878 vec![]
2879 };
2880
2881 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2883
2884 let normalized_exprs =
2885 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2886 for expr in normalized_exprs {
2887 new_field_columns.push(expr.schema_name().to_string());
2888 }
2889 self.ctx.field_columns = new_field_columns;
2890
2891 Ok((exprs, prev_field_exprs))
2892 }
2893
2894 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2895 let param = param
2896 .as_deref()
2897 .with_context(|| FunctionInvalidArgumentSnafu {
2898 fn_name: op.to_string(),
2899 })?;
2900 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2901 return FunctionInvalidArgumentSnafu {
2902 fn_name: op.to_string(),
2903 }
2904 .fail();
2905 };
2906
2907 Ok(val)
2908 }
2909
2910 fn get_param_as_literal_expr(
2911 param: &Option<Box<PromExpr>>,
2912 op: Option<TokenType>,
2913 expected_type: Option<ArrowDataType>,
2914 ) -> Result<DfExpr> {
2915 let prom_param = param.as_deref().with_context(|| {
2916 if let Some(op) = op {
2917 FunctionInvalidArgumentSnafu {
2918 fn_name: op.to_string(),
2919 }
2920 } else {
2921 FunctionInvalidArgumentSnafu {
2922 fn_name: "unknown".to_string(),
2923 }
2924 }
2925 })?;
2926
2927 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2928 if let Some(op) = op {
2929 FunctionInvalidArgumentSnafu {
2930 fn_name: op.to_string(),
2931 }
2932 } else {
2933 FunctionInvalidArgumentSnafu {
2934 fn_name: "unknown".to_string(),
2935 }
2936 }
2937 })?;
2938
2939 if let Some(expected_type) = expected_type {
2941 let expr_type = expr
2943 .get_type(&DFSchema::empty())
2944 .context(DataFusionPlanningSnafu)?;
2945 if expected_type != expr_type {
2946 return FunctionInvalidArgumentSnafu {
2947 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2948 }
2949 .fail();
2950 }
2951 }
2952
2953 Ok(expr)
2954 }
2955
2956 fn create_window_exprs(
2959 &mut self,
2960 op: TokenType,
2961 group_exprs: Vec<DfExpr>,
2962 input_plan: &LogicalPlan,
2963 ) -> Result<Vec<DfExpr>> {
2964 ensure!(
2965 self.ctx.field_columns.len() == 1,
2966 UnsupportedExprSnafu {
2967 name: "topk or bottomk on multi-value input"
2968 }
2969 );
2970
2971 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2972
2973 let asc = matches!(op.id(), token::T_BOTTOMK);
2974
2975 let tag_sort_exprs = self
2976 .create_tag_column_exprs()?
2977 .into_iter()
2978 .map(|expr| expr.sort(asc, true));
2979
2980 let exprs: Vec<DfExpr> = self
2982 .ctx
2983 .field_columns
2984 .iter()
2985 .map(|col| {
2986 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2987 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2989 sort_exprs.extend(tag_sort_exprs.clone());
2992
2993 DfExpr::WindowFunction(Box::new(WindowFunction {
2994 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2995 params: WindowFunctionParams {
2996 args: vec![],
2997 partition_by: group_exprs.clone(),
2998 order_by: sort_exprs,
2999 window_frame: WindowFrame::new(Some(true)),
3000 null_treatment: None,
3001 distinct: false,
3002 filter: None,
3003 },
3004 }))
3005 })
3006 .collect();
3007
3008 let normalized_exprs =
3009 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3010 Ok(normalized_exprs)
3011 }
3012
3013 #[deprecated(
3015 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3016 )]
3017 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3018 match expr {
3019 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3020 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3021 PromExpr::Unary(UnaryExpr { expr, .. }) => {
3022 Self::try_build_float_literal(expr).map(|f| -f)
3023 }
3024 PromExpr::StringLiteral(_)
3025 | PromExpr::Binary(_)
3026 | PromExpr::VectorSelector(_)
3027 | PromExpr::MatrixSelector(_)
3028 | PromExpr::Call(_)
3029 | PromExpr::Extension(_)
3030 | PromExpr::Aggregate(_)
3031 | PromExpr::Subquery(_) => None,
3032 }
3033 }
3034
3035 async fn create_histogram_plan(
3037 &mut self,
3038 args: &PromFunctionArgs,
3039 query_engine_state: &QueryEngineState,
3040 ) -> Result<LogicalPlan> {
3041 if args.args.len() != 2 {
3042 return FunctionInvalidArgumentSnafu {
3043 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3044 }
3045 .fail();
3046 }
3047 #[allow(deprecated)]
3048 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3049 FunctionInvalidArgumentSnafu {
3050 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3051 }
3052 })?;
3053
3054 let input = args.args[1].as_ref().clone();
3055 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3056 let input_plan = self.strip_tsid_column(input_plan)?;
3060 self.ctx.use_tsid = false;
3061
3062 if !self.ctx.has_le_tag() {
3063 return Ok(LogicalPlan::EmptyRelation(
3066 datafusion::logical_expr::EmptyRelation {
3067 produce_one_row: false,
3068 schema: Arc::new(DFSchema::empty()),
3069 },
3070 ));
3071 }
3072 let time_index_column =
3073 self.ctx
3074 .time_index_column
3075 .clone()
3076 .with_context(|| TimeIndexNotFoundSnafu {
3077 table: self.ctx.table_name.clone().unwrap_or_default(),
3078 })?;
3079 let field_column = self
3081 .ctx
3082 .field_columns
3083 .first()
3084 .with_context(|| FunctionInvalidArgumentSnafu {
3085 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3086 })?
3087 .clone();
3088 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3090
3091 Ok(LogicalPlan::Extension(Extension {
3092 node: Arc::new(
3093 HistogramFold::new(
3094 LE_COLUMN_NAME.to_string(),
3095 field_column,
3096 time_index_column,
3097 phi,
3098 input_plan,
3099 )
3100 .context(DataFusionPlanningSnafu)?,
3101 ),
3102 }))
3103 }
3104
3105 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3107 if args.args.len() != 1 {
3108 return FunctionInvalidArgumentSnafu {
3109 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3110 }
3111 .fail();
3112 }
3113 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3114
3115 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3117 self.ctx.reset_table_name_and_schema();
3118 self.ctx.tag_columns = vec![];
3119 self.ctx.field_columns = vec![greptime_value().to_string()];
3120 Ok(LogicalPlan::Extension(Extension {
3121 node: Arc::new(
3122 EmptyMetric::new(
3123 self.ctx.start,
3124 self.ctx.end,
3125 self.ctx.interval,
3126 SPECIAL_TIME_FUNCTION.to_string(),
3127 greptime_value().to_string(),
3128 Some(lit),
3129 )
3130 .context(DataFusionPlanningSnafu)?,
3131 ),
3132 }))
3133 }
3134
3135 async fn create_scalar_plan(
3137 &mut self,
3138 args: &PromFunctionArgs,
3139 query_engine_state: &QueryEngineState,
3140 ) -> Result<LogicalPlan> {
3141 ensure!(
3142 args.len() == 1,
3143 FunctionInvalidArgumentSnafu {
3144 fn_name: SCALAR_FUNCTION
3145 }
3146 );
3147 let input = self
3148 .prom_expr_to_plan(&args.args[0], query_engine_state)
3149 .await?;
3150 ensure!(
3151 self.ctx.field_columns.len() == 1,
3152 MultiFieldsNotSupportedSnafu {
3153 operator: SCALAR_FUNCTION
3154 },
3155 );
3156 let scalar_plan = LogicalPlan::Extension(Extension {
3157 node: Arc::new(
3158 ScalarCalculate::new(
3159 self.ctx.start,
3160 self.ctx.end,
3161 self.ctx.interval,
3162 input,
3163 self.ctx.time_index_column.as_ref().unwrap(),
3164 &self.ctx.tag_columns,
3165 &self.ctx.field_columns[0],
3166 self.ctx.table_name.as_deref(),
3167 )
3168 .context(PromqlPlanNodeSnafu)?,
3169 ),
3170 });
3171 self.ctx.tag_columns.clear();
3173 self.ctx.field_columns.clear();
3174 self.ctx
3175 .field_columns
3176 .push(scalar_plan.schema().field(1).name().clone());
3177 Ok(scalar_plan)
3178 }
3179
3180 async fn create_absent_plan(
3182 &mut self,
3183 args: &PromFunctionArgs,
3184 query_engine_state: &QueryEngineState,
3185 ) -> Result<LogicalPlan> {
3186 if args.args.len() != 1 {
3187 return FunctionInvalidArgumentSnafu {
3188 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3189 }
3190 .fail();
3191 }
3192 let input = self
3193 .prom_expr_to_plan(&args.args[0], query_engine_state)
3194 .await?;
3195
3196 let time_index_expr = self.create_time_index_column_expr()?;
3197 let first_field_expr =
3198 self.create_field_column_exprs()?
3199 .pop()
3200 .with_context(|| ValueNotFoundSnafu {
3201 table: self.ctx.table_name.clone().unwrap_or_default(),
3202 })?;
3203 let first_value_expr = first_value(first_field_expr, vec![]);
3204
3205 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3206 .aggregate(
3207 vec![time_index_expr.clone()],
3208 vec![first_value_expr.clone()],
3209 )
3210 .context(DataFusionPlanningSnafu)?
3211 .sort(vec![time_index_expr.sort(true, false)])
3212 .context(DataFusionPlanningSnafu)?
3213 .build()
3214 .context(DataFusionPlanningSnafu)?;
3215
3216 let fake_labels = self
3217 .ctx
3218 .selector_matcher
3219 .iter()
3220 .filter_map(|matcher| match matcher.op {
3221 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3222 _ => None,
3223 })
3224 .collect::<Vec<_>>();
3225
3226 let absent_plan = LogicalPlan::Extension(Extension {
3228 node: Arc::new(
3229 Absent::try_new(
3230 self.ctx.start,
3231 self.ctx.end,
3232 self.ctx.interval,
3233 self.ctx.time_index_column.as_ref().unwrap().clone(),
3234 self.ctx.field_columns[0].clone(),
3235 fake_labels,
3236 ordered_aggregated_input,
3237 )
3238 .context(DataFusionPlanningSnafu)?,
3239 ),
3240 });
3241
3242 Ok(absent_plan)
3243 }
3244
3245 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3248 match expr {
3249 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3250 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3251 PromExpr::VectorSelector(_)
3252 | PromExpr::MatrixSelector(_)
3253 | PromExpr::Extension(_)
3254 | PromExpr::Aggregate(_)
3255 | PromExpr::Subquery(_) => None,
3256 PromExpr::Call(Call { func, .. }) => {
3257 if func.name == SPECIAL_TIME_FUNCTION {
3258 None
3261 } else {
3262 None
3263 }
3264 }
3265 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3266 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3268 PromExpr::Binary(PromBinaryExpr {
3269 lhs,
3270 rhs,
3271 op,
3272 modifier,
3273 }) => {
3274 let lhs = Self::try_build_literal_expr(lhs)?;
3275 let rhs = Self::try_build_literal_expr(rhs)?;
3276 let is_comparison_op = Self::is_token_a_comparison_op(*op);
3277 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3278 let expr = expr_builder(lhs, rhs).ok()?;
3279
3280 let should_return_bool = if let Some(m) = modifier {
3281 m.return_bool
3282 } else {
3283 false
3284 };
3285 if is_comparison_op && should_return_bool {
3286 Some(DfExpr::Cast(Cast {
3287 expr: Box::new(expr),
3288 data_type: ArrowDataType::Float64,
3289 }))
3290 } else {
3291 Some(expr)
3292 }
3293 }
3294 }
3295 }
3296
3297 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3298 match expr {
3299 PromExpr::Call(Call { func, .. }) => {
3300 if func.name == SPECIAL_TIME_FUNCTION
3301 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3302 {
3303 Some(build_special_time_expr(time_index_col))
3304 } else {
3305 None
3306 }
3307 }
3308 _ => None,
3309 }
3310 }
3311
3312 #[allow(clippy::type_complexity)]
3315 fn prom_token_to_binary_expr_builder(
3316 token: TokenType,
3317 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3318 let cast_float = |expr| {
3319 if matches!(
3320 &expr,
3321 DfExpr::Cast(Cast {
3322 data_type: ArrowDataType::Float64,
3323 ..
3324 })
3325 ) || matches!(&expr, DfExpr::Literal(ScalarValue::Float64(_), _))
3326 {
3327 expr
3328 } else {
3329 DfExpr::Cast(Cast {
3330 expr: Box::new(expr),
3331 data_type: ArrowDataType::Float64,
3332 })
3333 }
3334 };
3335 match token.id() {
3336 token::T_ADD => Ok(Box::new(move |lhs, rhs| {
3337 Ok(cast_float(lhs) + cast_float(rhs))
3338 })),
3339 token::T_SUB => Ok(Box::new(move |lhs, rhs| {
3340 Ok(cast_float(lhs) - cast_float(rhs))
3341 })),
3342 token::T_MUL => Ok(Box::new(move |lhs, rhs| {
3343 Ok(cast_float(lhs) * cast_float(rhs))
3344 })),
3345 token::T_DIV => Ok(Box::new(move |lhs, rhs| {
3346 Ok(cast_float(lhs) / cast_float(rhs))
3347 })),
3348 token::T_MOD => Ok(Box::new(move |lhs: DfExpr, rhs| {
3349 Ok(cast_float(lhs) % cast_float(rhs))
3350 })),
3351 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3352 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3353 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3354 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3355 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3356 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3357 token::T_POW => Ok(Box::new(move |lhs, rhs| {
3358 Ok(DfExpr::ScalarFunction(ScalarFunction {
3359 func: datafusion_functions::math::power(),
3360 args: vec![cast_float(lhs), cast_float(rhs)],
3361 }))
3362 })),
3363 token::T_ATAN2 => Ok(Box::new(move |lhs, rhs| {
3364 Ok(DfExpr::ScalarFunction(ScalarFunction {
3365 func: datafusion_functions::math::atan2(),
3366 args: vec![cast_float(lhs), cast_float(rhs)],
3367 }))
3368 })),
3369 _ => UnexpectedTokenSnafu { token }.fail(),
3370 }
3371 }
3372
3373 fn is_token_a_comparison_op(token: TokenType) -> bool {
3375 matches!(
3376 token.id(),
3377 token::T_EQLC
3378 | token::T_NEQ
3379 | token::T_GTR
3380 | token::T_LSS
3381 | token::T_GTE
3382 | token::T_LTE
3383 )
3384 }
3385
3386 fn is_token_a_set_op(token: TokenType) -> bool {
3388 matches!(
3389 token.id(),
3390 token::T_LAND | token::T_LOR | token::T_LUNLESS )
3394 }
3395
3396 #[allow(clippy::too_many_arguments)]
3399 fn join_on_non_field_columns(
3400 &self,
3401 left: LogicalPlan,
3402 right: LogicalPlan,
3403 left_table_ref: TableReference,
3404 right_table_ref: TableReference,
3405 left_time_index_column: Option<String>,
3406 right_time_index_column: Option<String>,
3407 only_join_time_index: bool,
3408 modifier: &Option<BinModifier>,
3409 ) -> Result<LogicalPlan> {
3410 let mut left_tag_columns = if only_join_time_index {
3411 BTreeSet::new()
3412 } else {
3413 self.ctx
3414 .tag_columns
3415 .iter()
3416 .cloned()
3417 .collect::<BTreeSet<_>>()
3418 };
3419 let mut right_tag_columns = left_tag_columns.clone();
3420
3421 if let Some(modifier) = modifier {
3423 if let Some(matching) = &modifier.matching {
3425 match matching {
3426 LabelModifier::Include(on) => {
3428 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3429 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3430 right_tag_columns =
3431 right_tag_columns.intersection(&mask).cloned().collect();
3432 }
3433 LabelModifier::Exclude(ignoring) => {
3435 for label in &ignoring.labels {
3437 let _ = left_tag_columns.remove(label);
3438 let _ = right_tag_columns.remove(label);
3439 }
3440 }
3441 }
3442 }
3443 }
3444
3445 if let (Some(left_time_index_column), Some(right_time_index_column)) =
3447 (left_time_index_column, right_time_index_column)
3448 {
3449 left_tag_columns.insert(left_time_index_column);
3450 right_tag_columns.insert(right_time_index_column);
3451 }
3452
3453 let right = LogicalPlanBuilder::from(right)
3454 .alias(right_table_ref)
3455 .context(DataFusionPlanningSnafu)?
3456 .build()
3457 .context(DataFusionPlanningSnafu)?;
3458
3459 LogicalPlanBuilder::from(left)
3461 .alias(left_table_ref)
3462 .context(DataFusionPlanningSnafu)?
3463 .join_detailed(
3464 right,
3465 JoinType::Inner,
3466 (
3467 left_tag_columns
3468 .into_iter()
3469 .map(Column::from_name)
3470 .collect::<Vec<_>>(),
3471 right_tag_columns
3472 .into_iter()
3473 .map(Column::from_name)
3474 .collect::<Vec<_>>(),
3475 ),
3476 None,
3477 NullEquality::NullEqualsNull,
3478 )
3479 .context(DataFusionPlanningSnafu)?
3480 .build()
3481 .context(DataFusionPlanningSnafu)
3482 }
3483
3484 fn set_op_on_non_field_columns(
3486 &mut self,
3487 left: LogicalPlan,
3488 mut right: LogicalPlan,
3489 left_context: PromPlannerContext,
3490 right_context: PromPlannerContext,
3491 op: TokenType,
3492 modifier: &Option<BinModifier>,
3493 ) -> Result<LogicalPlan> {
3494 let mut left_tag_col_set = left_context
3495 .tag_columns
3496 .iter()
3497 .cloned()
3498 .collect::<HashSet<_>>();
3499 let mut right_tag_col_set = right_context
3500 .tag_columns
3501 .iter()
3502 .cloned()
3503 .collect::<HashSet<_>>();
3504
3505 if matches!(op.id(), token::T_LOR) {
3506 return self.or_operator(
3507 left,
3508 right,
3509 left_tag_col_set,
3510 right_tag_col_set,
3511 left_context,
3512 right_context,
3513 modifier,
3514 );
3515 }
3516
3517 if let Some(modifier) = modifier {
3519 ensure!(
3521 matches!(
3522 modifier.card,
3523 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3524 ),
3525 UnsupportedVectorMatchSnafu {
3526 name: modifier.card.clone(),
3527 },
3528 );
3529 if let Some(matching) = &modifier.matching {
3531 match matching {
3532 LabelModifier::Include(on) => {
3534 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3535 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3536 right_tag_col_set =
3537 right_tag_col_set.intersection(&mask).cloned().collect();
3538 }
3539 LabelModifier::Exclude(ignoring) => {
3541 for label in &ignoring.labels {
3543 let _ = left_tag_col_set.remove(label);
3544 let _ = right_tag_col_set.remove(label);
3545 }
3546 }
3547 }
3548 }
3549 }
3550 if !matches!(op.id(), token::T_LOR) {
3552 ensure!(
3553 left_tag_col_set == right_tag_col_set,
3554 CombineTableColumnMismatchSnafu {
3555 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3556 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3557 }
3558 )
3559 };
3560 let left_time_index = left_context.time_index_column.clone().unwrap();
3561 let right_time_index = right_context.time_index_column.clone().unwrap();
3562 let join_keys = left_tag_col_set
3563 .iter()
3564 .cloned()
3565 .chain([left_time_index.clone()])
3566 .collect::<Vec<_>>();
3567 self.ctx.time_index_column = Some(left_time_index.clone());
3568 self.ctx.use_tsid = left_context.use_tsid;
3569
3570 if left_context.time_index_column != right_context.time_index_column {
3572 let right_project_exprs = right
3573 .schema()
3574 .fields()
3575 .iter()
3576 .map(|field| {
3577 if field.name() == &right_time_index {
3578 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3579 } else {
3580 DfExpr::Column(Column::from_name(field.name()))
3581 }
3582 })
3583 .collect::<Vec<_>>();
3584
3585 right = LogicalPlanBuilder::from(right)
3586 .project(right_project_exprs)
3587 .context(DataFusionPlanningSnafu)?
3588 .build()
3589 .context(DataFusionPlanningSnafu)?;
3590 }
3591
3592 ensure!(
3593 left_context.field_columns.len() == 1,
3594 MultiFieldsNotSupportedSnafu {
3595 operator: "AND operator"
3596 }
3597 );
3598 let left_field_col = left_context.field_columns.first().unwrap();
3601 self.ctx.field_columns = vec![left_field_col.clone()];
3602
3603 match op.id() {
3606 token::T_LAND => LogicalPlanBuilder::from(left)
3607 .distinct()
3608 .context(DataFusionPlanningSnafu)?
3609 .join_detailed(
3610 right,
3611 JoinType::LeftSemi,
3612 (join_keys.clone(), join_keys),
3613 None,
3614 NullEquality::NullEqualsNull,
3615 )
3616 .context(DataFusionPlanningSnafu)?
3617 .build()
3618 .context(DataFusionPlanningSnafu),
3619 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3620 .distinct()
3621 .context(DataFusionPlanningSnafu)?
3622 .join_detailed(
3623 right,
3624 JoinType::LeftAnti,
3625 (join_keys.clone(), join_keys),
3626 None,
3627 NullEquality::NullEqualsNull,
3628 )
3629 .context(DataFusionPlanningSnafu)?
3630 .build()
3631 .context(DataFusionPlanningSnafu),
3632 token::T_LOR => {
3633 unreachable!()
3636 }
3637 _ => UnexpectedTokenSnafu { token: op }.fail(),
3638 }
3639 }
3640
3641 #[allow(clippy::too_many_arguments)]
3643 fn or_operator(
3644 &mut self,
3645 left: LogicalPlan,
3646 right: LogicalPlan,
3647 left_tag_cols_set: HashSet<String>,
3648 right_tag_cols_set: HashSet<String>,
3649 left_context: PromPlannerContext,
3650 right_context: PromPlannerContext,
3651 modifier: &Option<BinModifier>,
3652 ) -> Result<LogicalPlan> {
3653 ensure!(
3655 left_context.field_columns.len() == right_context.field_columns.len(),
3656 CombineTableColumnMismatchSnafu {
3657 left: left_context.field_columns.clone(),
3658 right: right_context.field_columns.clone()
3659 }
3660 );
3661 ensure!(
3662 left_context.field_columns.len() == 1,
3663 MultiFieldsNotSupportedSnafu {
3664 operator: "OR operator"
3665 }
3666 );
3667
3668 let all_tags = left_tag_cols_set
3670 .union(&right_tag_cols_set)
3671 .cloned()
3672 .collect::<HashSet<_>>();
3673 let tags_not_in_left = all_tags
3674 .difference(&left_tag_cols_set)
3675 .cloned()
3676 .collect::<Vec<_>>();
3677 let tags_not_in_right = all_tags
3678 .difference(&right_tag_cols_set)
3679 .cloned()
3680 .collect::<Vec<_>>();
3681 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3682 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3683 let left_qualifier_string = left_qualifier
3684 .as_ref()
3685 .map(|l| l.to_string())
3686 .unwrap_or_default();
3687 let right_qualifier_string = right_qualifier
3688 .as_ref()
3689 .map(|r| r.to_string())
3690 .unwrap_or_default();
3691 let left_time_index_column =
3692 left_context
3693 .time_index_column
3694 .clone()
3695 .with_context(|| TimeIndexNotFoundSnafu {
3696 table: left_qualifier_string.clone(),
3697 })?;
3698 let right_time_index_column =
3699 right_context
3700 .time_index_column
3701 .clone()
3702 .with_context(|| TimeIndexNotFoundSnafu {
3703 table: right_qualifier_string.clone(),
3704 })?;
3705 let left_field_col = left_context.field_columns.first().unwrap();
3707 let right_field_col = right_context.field_columns.first().unwrap();
3708 let left_has_tsid = left
3709 .schema()
3710 .fields()
3711 .iter()
3712 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3713 let right_has_tsid = right
3714 .schema()
3715 .fields()
3716 .iter()
3717 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3718
3719 let mut all_columns_set = left
3721 .schema()
3722 .fields()
3723 .iter()
3724 .chain(right.schema().fields().iter())
3725 .map(|field| field.name().clone())
3726 .collect::<HashSet<_>>();
3727 if !(left_has_tsid && right_has_tsid) {
3730 all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3731 }
3732 all_columns_set.remove(&left_time_index_column);
3734 all_columns_set.remove(&right_time_index_column);
3735 if left_field_col != right_field_col {
3737 all_columns_set.remove(right_field_col);
3738 }
3739 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3740 all_columns.sort_unstable();
3742 all_columns.insert(0, left_time_index_column.clone());
3744
3745 let left_proj_exprs = all_columns.iter().map(|col| {
3747 if tags_not_in_left.contains(col) {
3748 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3749 } else {
3750 DfExpr::Column(Column::new(None::<String>, col))
3751 }
3752 });
3753 let right_time_index_expr = DfExpr::Column(Column::new(
3754 right_qualifier.clone(),
3755 right_time_index_column,
3756 ))
3757 .alias(left_time_index_column.clone());
3758 let right_qualifier_for_field = right
3761 .schema()
3762 .iter()
3763 .find(|(_, f)| f.name() == right_field_col)
3764 .map(|(q, _)| q)
3765 .with_context(|| ColumnNotFoundSnafu {
3766 col: right_field_col.clone(),
3767 })?
3768 .cloned();
3769
3770 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3772 if col == left_field_col && left_field_col != right_field_col {
3774 DfExpr::Column(Column::new(
3776 right_qualifier_for_field.clone(),
3777 right_field_col,
3778 ))
3779 } else if tags_not_in_right.contains(col) {
3780 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3781 } else {
3782 DfExpr::Column(Column::new(None::<String>, col))
3783 }
3784 });
3785 let right_proj_exprs = [right_time_index_expr]
3786 .into_iter()
3787 .chain(right_proj_exprs_without_time_index);
3788
3789 let left_projected = LogicalPlanBuilder::from(left)
3790 .project(left_proj_exprs)
3791 .context(DataFusionPlanningSnafu)?
3792 .alias(left_qualifier_string.clone())
3793 .context(DataFusionPlanningSnafu)?
3794 .build()
3795 .context(DataFusionPlanningSnafu)?;
3796 let right_projected = LogicalPlanBuilder::from(right)
3797 .project(right_proj_exprs)
3798 .context(DataFusionPlanningSnafu)?
3799 .alias(right_qualifier_string.clone())
3800 .context(DataFusionPlanningSnafu)?
3801 .build()
3802 .context(DataFusionPlanningSnafu)?;
3803
3804 let mut match_columns = if let Some(modifier) = modifier
3806 && let Some(matching) = &modifier.matching
3807 {
3808 match matching {
3809 LabelModifier::Include(on) => on.labels.clone(),
3811 LabelModifier::Exclude(ignoring) => {
3813 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3814 all_tags.difference(&ignoring).cloned().collect()
3815 }
3816 }
3817 } else {
3818 all_tags.iter().cloned().collect()
3819 };
3820 match_columns.sort_unstable();
3822 let schema = left_projected.schema().clone();
3824 let union_distinct_on = UnionDistinctOn::new(
3825 left_projected,
3826 right_projected,
3827 match_columns,
3828 left_time_index_column.clone(),
3829 schema,
3830 );
3831 let result = LogicalPlan::Extension(Extension {
3832 node: Arc::new(union_distinct_on),
3833 });
3834
3835 self.ctx.time_index_column = Some(left_time_index_column);
3837 self.ctx.tag_columns = all_tags.into_iter().collect();
3838 self.ctx.field_columns = vec![left_field_col.clone()];
3839 self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3840
3841 Ok(result)
3842 }
3843
3844 fn projection_for_each_field_column<F>(
3852 &mut self,
3853 input: LogicalPlan,
3854 name_to_expr: F,
3855 ) -> Result<LogicalPlan>
3856 where
3857 F: FnMut(&String) -> Result<DfExpr>,
3858 {
3859 let non_field_columns_iter = self
3860 .ctx
3861 .tag_columns
3862 .iter()
3863 .chain(self.ctx.time_index_column.iter())
3864 .map(|col| {
3865 Ok(DfExpr::Column(Column::new(
3866 self.ctx.table_name.clone().map(TableReference::bare),
3867 col,
3868 )))
3869 });
3870
3871 let result_field_columns = self
3873 .ctx
3874 .field_columns
3875 .iter()
3876 .map(name_to_expr)
3877 .collect::<Result<Vec<_>>>()?;
3878
3879 self.ctx.field_columns = result_field_columns
3881 .iter()
3882 .map(|expr| expr.schema_name().to_string())
3883 .collect();
3884 let field_columns_iter = result_field_columns
3885 .into_iter()
3886 .zip(self.ctx.field_columns.iter())
3887 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3888
3889 let project_fields = non_field_columns_iter
3891 .chain(field_columns_iter)
3892 .collect::<Result<Vec<_>>>()?;
3893
3894 LogicalPlanBuilder::from(input)
3895 .project(project_fields)
3896 .context(DataFusionPlanningSnafu)?
3897 .build()
3898 .context(DataFusionPlanningSnafu)
3899 }
3900
3901 fn filter_on_field_column<F>(
3904 &self,
3905 input: LogicalPlan,
3906 mut name_to_expr: F,
3907 ) -> Result<LogicalPlan>
3908 where
3909 F: FnMut(&String) -> Result<DfExpr>,
3910 {
3911 ensure!(
3912 self.ctx.field_columns.len() == 1,
3913 UnsupportedExprSnafu {
3914 name: "filter on multi-value input"
3915 }
3916 );
3917
3918 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3919
3920 LogicalPlanBuilder::from(input)
3921 .filter(field_column_filter)
3922 .context(DataFusionPlanningSnafu)?
3923 .build()
3924 .context(DataFusionPlanningSnafu)
3925 }
3926
3927 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3930 let input_expr = datafusion::logical_expr::col(
3931 self.ctx
3932 .time_index_column
3933 .as_ref()
3934 .with_context(|| TimeIndexNotFoundSnafu {
3936 table: "<doesn't matter>",
3937 })?,
3938 );
3939 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3940 func: datafusion_functions::datetime::date_part(),
3941 args: vec![date_part.lit(), input_expr],
3942 });
3943 Ok(fn_expr)
3944 }
3945
3946 fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3947 let schema = plan.schema();
3948 if !schema
3949 .fields()
3950 .iter()
3951 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3952 {
3953 return Ok(plan);
3954 }
3955
3956 let project_exprs = schema
3957 .fields()
3958 .iter()
3959 .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3960 .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3961 .collect::<Result<Vec<_>>>()?;
3962
3963 LogicalPlanBuilder::from(plan)
3964 .project(project_exprs)
3965 .context(DataFusionPlanningSnafu)?
3966 .build()
3967 .context(DataFusionPlanningSnafu)
3968 }
3969
3970 fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
3972 let fields_expr = self.create_field_column_exprs()?;
3973
3974 ensure!(
3976 fields_expr.len() == 1,
3977 UnsupportedExprSnafu {
3978 name: "alias on multi-value result"
3979 }
3980 );
3981
3982 let project_fields = fields_expr
3983 .into_iter()
3984 .map(|expr| expr.alias(&alias_name))
3985 .chain(self.create_tag_column_exprs()?)
3986 .chain(Some(self.create_time_index_column_expr()?));
3987
3988 LogicalPlanBuilder::from(plan)
3989 .project(project_fields)
3990 .context(DataFusionPlanningSnafu)?
3991 .build()
3992 .context(DataFusionPlanningSnafu)
3993 }
3994}
3995
3996#[derive(Default, Debug)]
3997struct FunctionArgs {
3998 input: Option<PromExpr>,
3999 literals: Vec<DfExpr>,
4000}
4001
4002#[derive(Debug, Clone)]
4005enum ScalarFunc {
4006 DataFusionBuiltin(Arc<ScalarUdfDef>),
4010 DataFusionUdf(Arc<ScalarUdfDef>),
4014 Udf(Arc<ScalarUdfDef>),
4019 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4026 GeneratedExpr,
4030}
4031
4032#[cfg(test)]
4033mod test {
4034 use std::time::{Duration, UNIX_EPOCH};
4035
4036 use catalog::RegisterTableRequest;
4037 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4038 use common_base::Plugins;
4039 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4040 use common_query::prelude::greptime_timestamp;
4041 use common_query::test_util::DummyDecoder;
4042 use datatypes::prelude::ConcreteDataType;
4043 use datatypes::schema::{ColumnSchema, Schema};
4044 use promql_parser::label::Labels;
4045 use promql_parser::parser;
4046 use session::context::QueryContext;
4047 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4048 use table::test_util::EmptyTable;
4049
4050 use super::*;
4051 use crate::QueryEngineContext;
4052 use crate::options::QueryOptions;
4053 use crate::parser::QueryLanguageParser;
4054
4055 fn build_query_engine_state() -> QueryEngineState {
4056 QueryEngineState::new(
4057 new_memory_catalog_manager().unwrap(),
4058 None,
4059 None,
4060 None,
4061 None,
4062 None,
4063 false,
4064 Plugins::default(),
4065 QueryOptions::default(),
4066 )
4067 }
4068
4069 async fn build_optimized_promql_plan(
4070 table_provider: DfTableSourceProvider,
4071 eval_stmt: &EvalStmt,
4072 ) -> LogicalPlan {
4073 let state = build_query_engine_state();
4074 let raw_plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &state)
4075 .await
4076 .unwrap();
4077 let context = QueryEngineContext::new(state.session_state(), QueryContext::arc());
4078 state
4079 .optimize_by_extension_rules(raw_plan, &context)
4080 .unwrap()
4081 }
4082
4083 async fn build_optimized_tsid_plan(
4084 query: &str,
4085 num_tag: usize,
4086 num_field: usize,
4087 end_secs: u64,
4088 lookback_secs: u64,
4089 ) -> String {
4090 let eval_stmt = EvalStmt {
4091 expr: parser::parse(query).unwrap(),
4092 start: UNIX_EPOCH,
4093 end: UNIX_EPOCH
4094 .checked_add(Duration::from_secs(end_secs))
4095 .unwrap(),
4096 interval: Duration::from_secs(5),
4097 lookback_delta: Duration::from_secs(lookback_secs),
4098 };
4099 let table_provider = build_test_table_provider_with_tsid(
4100 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4101 num_tag,
4102 num_field,
4103 )
4104 .await;
4105
4106 build_optimized_promql_plan(table_provider, &eval_stmt)
4107 .await
4108 .display_indent_schema()
4109 .to_string()
4110 }
4111
4112 async fn assert_nested_count_rewrite_applies(query: &str, expected_outer_agg: &str) {
4113 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4114
4115 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4116 assert!(plan_str.contains("Projection: some_metric.timestamp, some_metric.tag_0"));
4117 assert!(plan_str.contains("Distinct:"));
4118 assert!(plan_str.contains(expected_outer_agg), "{plan_str}");
4119 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4120 }
4121
4122 async fn assert_nested_count_rewrite_missing(query: &str, num_tag: usize, lookback_secs: u64) {
4123 let plan_str = build_optimized_tsid_plan(query, num_tag, 1, 100_000, lookback_secs).await;
4124 assert!(!plan_str.contains("Distinct:"), "{plan_str}");
4125 }
4126
4127 async fn build_test_table_provider(
4128 table_name_tuples: &[(String, String)],
4129 num_tag: usize,
4130 num_field: usize,
4131 ) -> DfTableSourceProvider {
4132 let catalog_list = MemoryCatalogManager::with_default_setup();
4133 for (schema_name, table_name) in table_name_tuples {
4134 let mut columns = vec![];
4135 for i in 0..num_tag {
4136 columns.push(ColumnSchema::new(
4137 format!("tag_{i}"),
4138 ConcreteDataType::string_datatype(),
4139 false,
4140 ));
4141 }
4142 columns.push(
4143 ColumnSchema::new(
4144 "timestamp".to_string(),
4145 ConcreteDataType::timestamp_millisecond_datatype(),
4146 false,
4147 )
4148 .with_time_index(true),
4149 );
4150 for i in 0..num_field {
4151 columns.push(ColumnSchema::new(
4152 format!("field_{i}"),
4153 ConcreteDataType::float64_datatype(),
4154 true,
4155 ));
4156 }
4157 let schema = Arc::new(Schema::new(columns));
4158 let table_meta = TableMetaBuilder::empty()
4159 .schema(schema)
4160 .primary_key_indices((0..num_tag).collect())
4161 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4162 .next_column_id(1024)
4163 .build()
4164 .unwrap();
4165 let table_info = TableInfoBuilder::default()
4166 .name(table_name.clone())
4167 .meta(table_meta)
4168 .build()
4169 .unwrap();
4170 let table = EmptyTable::from_table_info(&table_info);
4171
4172 assert!(
4173 catalog_list
4174 .register_table_sync(RegisterTableRequest {
4175 catalog: DEFAULT_CATALOG_NAME.to_string(),
4176 schema: schema_name.clone(),
4177 table_name: table_name.clone(),
4178 table_id: 1024,
4179 table,
4180 })
4181 .is_ok()
4182 );
4183 }
4184
4185 DfTableSourceProvider::new(
4186 catalog_list,
4187 false,
4188 QueryContext::arc(),
4189 DummyDecoder::arc(),
4190 false,
4191 )
4192 }
4193
4194 async fn build_test_table_provider_with_tsid(
4195 table_name_tuples: &[(String, String)],
4196 num_tag: usize,
4197 num_field: usize,
4198 ) -> DfTableSourceProvider {
4199 let catalog_list = MemoryCatalogManager::with_default_setup();
4200
4201 let physical_table_name = "phy";
4202 let physical_table_id = 999u32;
4203
4204 {
4206 let mut columns = vec![
4207 ColumnSchema::new(
4208 DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4209 ConcreteDataType::uint32_datatype(),
4210 false,
4211 ),
4212 ColumnSchema::new(
4213 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4214 ConcreteDataType::uint64_datatype(),
4215 false,
4216 ),
4217 ];
4218 for i in 0..num_tag {
4219 columns.push(ColumnSchema::new(
4220 format!("tag_{i}"),
4221 ConcreteDataType::string_datatype(),
4222 false,
4223 ));
4224 }
4225 columns.push(
4226 ColumnSchema::new(
4227 "timestamp".to_string(),
4228 ConcreteDataType::timestamp_millisecond_datatype(),
4229 false,
4230 )
4231 .with_time_index(true),
4232 );
4233 for i in 0..num_field {
4234 columns.push(ColumnSchema::new(
4235 format!("field_{i}"),
4236 ConcreteDataType::float64_datatype(),
4237 true,
4238 ));
4239 }
4240
4241 let schema = Arc::new(Schema::new(columns));
4242 let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4243 let table_meta = TableMetaBuilder::empty()
4244 .schema(schema)
4245 .primary_key_indices(primary_key_indices)
4246 .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4247 .engine(METRIC_ENGINE_NAME.to_string())
4248 .next_column_id(1024)
4249 .build()
4250 .unwrap();
4251 let table_info = TableInfoBuilder::default()
4252 .table_id(physical_table_id)
4253 .name(physical_table_name)
4254 .meta(table_meta)
4255 .build()
4256 .unwrap();
4257 let table = EmptyTable::from_table_info(&table_info);
4258
4259 assert!(
4260 catalog_list
4261 .register_table_sync(RegisterTableRequest {
4262 catalog: DEFAULT_CATALOG_NAME.to_string(),
4263 schema: DEFAULT_SCHEMA_NAME.to_string(),
4264 table_name: physical_table_name.to_string(),
4265 table_id: physical_table_id,
4266 table,
4267 })
4268 .is_ok()
4269 );
4270 }
4271
4272 for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4274 let mut columns = vec![];
4275 for i in 0..num_tag {
4276 columns.push(ColumnSchema::new(
4277 format!("tag_{i}"),
4278 ConcreteDataType::string_datatype(),
4279 false,
4280 ));
4281 }
4282 columns.push(
4283 ColumnSchema::new(
4284 "timestamp".to_string(),
4285 ConcreteDataType::timestamp_millisecond_datatype(),
4286 false,
4287 )
4288 .with_time_index(true),
4289 );
4290 for i in 0..num_field {
4291 columns.push(ColumnSchema::new(
4292 format!("field_{i}"),
4293 ConcreteDataType::float64_datatype(),
4294 true,
4295 ));
4296 }
4297
4298 let schema = Arc::new(Schema::new(columns));
4299 let mut options = table::requests::TableOptions::default();
4300 options.extra_options.insert(
4301 LOGICAL_TABLE_METADATA_KEY.to_string(),
4302 physical_table_name.to_string(),
4303 );
4304 let table_id = 1024u32 + idx as u32;
4305 let table_meta = TableMetaBuilder::empty()
4306 .schema(schema)
4307 .primary_key_indices((0..num_tag).collect())
4308 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4309 .engine(METRIC_ENGINE_NAME.to_string())
4310 .options(options)
4311 .next_column_id(1024)
4312 .build()
4313 .unwrap();
4314 let table_info = TableInfoBuilder::default()
4315 .table_id(table_id)
4316 .name(table_name.clone())
4317 .meta(table_meta)
4318 .build()
4319 .unwrap();
4320 let table = EmptyTable::from_table_info(&table_info);
4321
4322 assert!(
4323 catalog_list
4324 .register_table_sync(RegisterTableRequest {
4325 catalog: DEFAULT_CATALOG_NAME.to_string(),
4326 schema: schema_name.clone(),
4327 table_name: table_name.clone(),
4328 table_id,
4329 table,
4330 })
4331 .is_ok()
4332 );
4333 }
4334
4335 DfTableSourceProvider::new(
4336 catalog_list,
4337 false,
4338 QueryContext::arc(),
4339 DummyDecoder::arc(),
4340 false,
4341 )
4342 }
4343
4344 async fn build_test_table_provider_with_fields(
4345 table_name_tuples: &[(String, String)],
4346 tags: &[&str],
4347 ) -> DfTableSourceProvider {
4348 let catalog_list = MemoryCatalogManager::with_default_setup();
4349 for (schema_name, table_name) in table_name_tuples {
4350 let mut columns = vec![];
4351 let num_tag = tags.len();
4352 for tag in tags {
4353 columns.push(ColumnSchema::new(
4354 tag.to_string(),
4355 ConcreteDataType::string_datatype(),
4356 false,
4357 ));
4358 }
4359 columns.push(
4360 ColumnSchema::new(
4361 greptime_timestamp().to_string(),
4362 ConcreteDataType::timestamp_millisecond_datatype(),
4363 false,
4364 )
4365 .with_time_index(true),
4366 );
4367 columns.push(ColumnSchema::new(
4368 greptime_value().to_string(),
4369 ConcreteDataType::float64_datatype(),
4370 true,
4371 ));
4372 let schema = Arc::new(Schema::new(columns));
4373 let table_meta = TableMetaBuilder::empty()
4374 .schema(schema)
4375 .primary_key_indices((0..num_tag).collect())
4376 .next_column_id(1024)
4377 .build()
4378 .unwrap();
4379 let table_info = TableInfoBuilder::default()
4380 .name(table_name.clone())
4381 .meta(table_meta)
4382 .build()
4383 .unwrap();
4384 let table = EmptyTable::from_table_info(&table_info);
4385
4386 assert!(
4387 catalog_list
4388 .register_table_sync(RegisterTableRequest {
4389 catalog: DEFAULT_CATALOG_NAME.to_string(),
4390 schema: schema_name.clone(),
4391 table_name: table_name.clone(),
4392 table_id: 1024,
4393 table,
4394 })
4395 .is_ok()
4396 );
4397 }
4398
4399 DfTableSourceProvider::new(
4400 catalog_list,
4401 false,
4402 QueryContext::arc(),
4403 DummyDecoder::arc(),
4404 false,
4405 )
4406 }
4407
4408 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4424 let prom_expr =
4425 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4426 let eval_stmt = EvalStmt {
4427 expr: prom_expr,
4428 start: UNIX_EPOCH,
4429 end: UNIX_EPOCH
4430 .checked_add(Duration::from_secs(100_000))
4431 .unwrap(),
4432 interval: Duration::from_secs(5),
4433 lookback_delta: Duration::from_secs(1),
4434 };
4435
4436 let table_provider = build_test_table_provider(
4437 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4438 1,
4439 1,
4440 )
4441 .await;
4442 let plan =
4443 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4444 .await
4445 .unwrap();
4446
4447 let expected = String::from(
4448 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4449 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4450 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4451 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4452 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4453 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4454 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"
4455 ).replace("TEMPLATE", plan_name);
4456
4457 assert_eq!(plan.display_indent_schema().to_string(), expected);
4458 }
4459
4460 #[tokio::test]
4461 async fn single_abs() {
4462 do_single_instant_function_call("abs", "abs").await;
4463 }
4464
4465 #[tokio::test]
4466 #[should_panic]
4467 async fn single_absent() {
4468 do_single_instant_function_call("absent", "").await;
4469 }
4470
4471 #[tokio::test]
4472 async fn single_ceil() {
4473 do_single_instant_function_call("ceil", "ceil").await;
4474 }
4475
4476 #[tokio::test]
4477 async fn single_exp() {
4478 do_single_instant_function_call("exp", "exp").await;
4479 }
4480
4481 #[tokio::test]
4482 async fn single_ln() {
4483 do_single_instant_function_call("ln", "ln").await;
4484 }
4485
4486 #[tokio::test]
4487 async fn single_log2() {
4488 do_single_instant_function_call("log2", "log2").await;
4489 }
4490
4491 #[tokio::test]
4492 async fn single_log10() {
4493 do_single_instant_function_call("log10", "log10").await;
4494 }
4495
4496 #[tokio::test]
4497 #[should_panic]
4498 async fn single_scalar() {
4499 do_single_instant_function_call("scalar", "").await;
4500 }
4501
4502 #[tokio::test]
4503 #[should_panic]
4504 async fn single_sgn() {
4505 do_single_instant_function_call("sgn", "").await;
4506 }
4507
4508 #[tokio::test]
4509 #[should_panic]
4510 async fn single_sort() {
4511 do_single_instant_function_call("sort", "").await;
4512 }
4513
4514 #[tokio::test]
4515 #[should_panic]
4516 async fn single_sort_desc() {
4517 do_single_instant_function_call("sort_desc", "").await;
4518 }
4519
4520 #[tokio::test]
4521 async fn single_sqrt() {
4522 do_single_instant_function_call("sqrt", "sqrt").await;
4523 }
4524
4525 #[tokio::test]
4526 #[should_panic]
4527 async fn single_timestamp() {
4528 do_single_instant_function_call("timestamp", "").await;
4529 }
4530
4531 #[tokio::test]
4532 async fn single_acos() {
4533 do_single_instant_function_call("acos", "acos").await;
4534 }
4535
4536 #[tokio::test]
4537 #[should_panic]
4538 async fn single_acosh() {
4539 do_single_instant_function_call("acosh", "").await;
4540 }
4541
4542 #[tokio::test]
4543 async fn single_asin() {
4544 do_single_instant_function_call("asin", "asin").await;
4545 }
4546
4547 #[tokio::test]
4548 #[should_panic]
4549 async fn single_asinh() {
4550 do_single_instant_function_call("asinh", "").await;
4551 }
4552
4553 #[tokio::test]
4554 async fn single_atan() {
4555 do_single_instant_function_call("atan", "atan").await;
4556 }
4557
4558 #[tokio::test]
4559 #[should_panic]
4560 async fn single_atanh() {
4561 do_single_instant_function_call("atanh", "").await;
4562 }
4563
4564 #[tokio::test]
4565 async fn single_cos() {
4566 do_single_instant_function_call("cos", "cos").await;
4567 }
4568
4569 #[tokio::test]
4570 #[should_panic]
4571 async fn single_cosh() {
4572 do_single_instant_function_call("cosh", "").await;
4573 }
4574
4575 #[tokio::test]
4576 async fn single_sin() {
4577 do_single_instant_function_call("sin", "sin").await;
4578 }
4579
4580 #[tokio::test]
4581 #[should_panic]
4582 async fn single_sinh() {
4583 do_single_instant_function_call("sinh", "").await;
4584 }
4585
4586 #[tokio::test]
4587 async fn single_tan() {
4588 do_single_instant_function_call("tan", "tan").await;
4589 }
4590
4591 #[tokio::test]
4592 #[should_panic]
4593 async fn single_tanh() {
4594 do_single_instant_function_call("tanh", "").await;
4595 }
4596
4597 #[tokio::test]
4598 #[should_panic]
4599 async fn single_deg() {
4600 do_single_instant_function_call("deg", "").await;
4601 }
4602
4603 #[tokio::test]
4604 #[should_panic]
4605 async fn single_rad() {
4606 do_single_instant_function_call("rad", "").await;
4607 }
4608
4609 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4631 let prom_expr = parser::parse(&format!(
4632 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4633 ))
4634 .unwrap();
4635 let mut eval_stmt = EvalStmt {
4636 expr: prom_expr,
4637 start: UNIX_EPOCH,
4638 end: UNIX_EPOCH
4639 .checked_add(Duration::from_secs(100_000))
4640 .unwrap(),
4641 interval: Duration::from_secs(5),
4642 lookback_delta: Duration::from_secs(1),
4643 };
4644
4645 let table_provider = build_test_table_provider(
4647 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4648 2,
4649 2,
4650 )
4651 .await;
4652 let plan =
4653 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4654 .await
4655 .unwrap();
4656 let expected_no_without = String::from(
4657 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4658 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4659 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4660 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4661 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4662 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4663 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4664 ).replace("TEMPLATE", plan_name);
4665 assert_eq!(
4666 plan.display_indent_schema().to_string(),
4667 expected_no_without
4668 );
4669
4670 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4672 *modifier = Some(LabelModifier::Exclude(Labels {
4673 labels: vec![String::from("tag_1")].into_iter().collect(),
4674 }));
4675 }
4676 let table_provider = build_test_table_provider(
4677 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4678 2,
4679 2,
4680 )
4681 .await;
4682 let plan =
4683 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4684 .await
4685 .unwrap();
4686 let expected_without = String::from(
4687 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4688 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4689 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4690 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4691 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4692 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4693 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4694 ).replace("TEMPLATE", plan_name);
4695 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4696 }
4697
4698 #[tokio::test]
4699 async fn aggregate_sum() {
4700 do_aggregate_expr_plan("sum", "sum").await;
4701 }
4702
4703 #[tokio::test]
4704 async fn tsid_is_used_for_series_divide_when_available() {
4705 let prom_expr = parser::parse("some_metric").unwrap();
4706 let eval_stmt = EvalStmt {
4707 expr: prom_expr,
4708 start: UNIX_EPOCH,
4709 end: UNIX_EPOCH
4710 .checked_add(Duration::from_secs(100_000))
4711 .unwrap(),
4712 interval: Duration::from_secs(5),
4713 lookback_delta: Duration::from_secs(1),
4714 };
4715
4716 let table_provider = build_test_table_provider_with_tsid(
4717 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4718 1,
4719 1,
4720 )
4721 .await;
4722 let plan =
4723 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4724 .await
4725 .unwrap();
4726
4727 let plan_str = plan.display_indent_schema().to_string();
4728 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4729 assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4730 assert!(
4731 !plan
4732 .schema()
4733 .fields()
4734 .iter()
4735 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4736 );
4737 }
4738
4739 #[tokio::test]
4740 async fn scalar_count_count_range_keeps_full_window() {
4741 let plan_str = build_optimized_tsid_plan(
4742 "scalar(count(count(some_metric) by (tag_0)))",
4743 1,
4744 1,
4745 100_000,
4746 1,
4747 )
4748 .await;
4749 assert!(plan_str.contains("ScalarCalculate: tags=[]"));
4750 assert!(plan_str.contains("PromInstantManipulate: range=[0..100000000]"));
4751 assert!(!plan_str.contains("PromInstantManipulate: range=[99999000..99999000]"));
4752 }
4753
4754 #[tokio::test]
4755 async fn scalar_count_count_rewrite_applies_inside_binary_expr_for_tsid_input() {
4756 let plan_str = build_optimized_tsid_plan(
4757 "sum(irate(some_metric[1h])) / scalar(count(count(some_metric) by (tag_0)))",
4758 2,
4759 1,
4760 10,
4761 300,
4762 )
4763 .await;
4764 assert!(plan_str.contains("Distinct:"), "{plan_str}");
4765 }
4766
4767 #[tokio::test]
4768 async fn nested_count_rewrite_keeps_full_series_key_with_tsid_input() {
4769 assert_nested_count_rewrite_applies(
4770 "count(count(some_metric) by (tag_0))",
4771 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(count(some_metric.field_0))]]"
4772 )
4773 .await;
4774 }
4775
4776 #[tokio::test]
4777 async fn nested_sum_count_rewrite_keeps_full_series_key_with_tsid_input() {
4778 assert_nested_count_rewrite_applies(
4779 "count(sum(some_metric) by (tag_0))",
4780 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(sum(some_metric.field_0))]]"
4781 )
4782 .await;
4783 }
4784
4785 #[tokio::test]
4786 async fn nested_supported_inner_aggs_rewrite_apply_for_tsid_input() {
4787 for (query, expected_outer_agg) in [
4788 (
4789 "count(avg(some_metric) by (tag_0))",
4790 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(avg(some_metric.field_0))]]",
4791 ),
4792 (
4793 "count(min(some_metric) by (tag_0))",
4794 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(min(some_metric.field_0))]]",
4795 ),
4796 (
4797 "count(max(some_metric) by (tag_0))",
4798 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(max(some_metric.field_0))]]",
4799 ),
4800 (
4801 "count(stddev(some_metric) by (tag_0))",
4802 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(stddev_pop(some_metric.field_0))]]",
4803 ),
4804 (
4805 "count(stdvar(some_metric) by (tag_0))",
4806 "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(var_pop(some_metric.field_0))]]",
4807 ),
4808 ] {
4809 assert_nested_count_rewrite_applies(query, expected_outer_agg).await;
4810 }
4811 }
4812
4813 #[tokio::test]
4814 async fn nested_non_count_inner_aggs_rewrite_filter_null_values_for_tsid_input() {
4815 let count_plan =
4816 build_optimized_tsid_plan("count(count(some_metric) by (tag_0))", 2, 1, 100_000, 1)
4817 .await;
4818 assert!(
4819 !count_plan.contains("some_metric.field_0 IS NOT NULL"),
4820 "{count_plan}"
4821 );
4822
4823 for query in [
4824 "count(sum(some_metric) by (tag_0))",
4825 "count(avg(some_metric) by (tag_0))",
4826 "count(min(some_metric) by (tag_0))",
4827 "count(max(some_metric) by (tag_0))",
4828 "count(stddev(some_metric) by (tag_0))",
4829 "count(stdvar(some_metric) by (tag_0))",
4830 ] {
4831 let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4832 assert!(
4833 plan_str.contains("Filter: some_metric.field_0 IS NOT NULL"),
4834 "{query}: {plan_str}"
4835 );
4836 }
4837 }
4838
4839 #[tokio::test]
4840 async fn nested_unsupported_or_non_direct_inner_aggs_do_not_rewrite() {
4841 assert_nested_count_rewrite_missing("count(group(some_metric) by (tag_0))", 2, 1).await;
4842 assert_nested_count_rewrite_missing(
4843 "count(sum(irate(some_metric[1h])) by (tag_0))",
4844 2,
4845 300,
4846 )
4847 .await;
4848 }
4849
4850 #[tokio::test]
4851 async fn physical_table_name_is_not_leaked_in_plan() {
4852 let prom_expr = parser::parse("some_metric").unwrap();
4853 let eval_stmt = EvalStmt {
4854 expr: prom_expr,
4855 start: UNIX_EPOCH,
4856 end: UNIX_EPOCH
4857 .checked_add(Duration::from_secs(100_000))
4858 .unwrap(),
4859 interval: Duration::from_secs(5),
4860 lookback_delta: Duration::from_secs(1),
4861 };
4862
4863 let table_provider = build_test_table_provider_with_tsid(
4864 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4865 1,
4866 1,
4867 )
4868 .await;
4869 let plan =
4870 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4871 .await
4872 .unwrap();
4873
4874 let plan_str = plan.display_indent_schema().to_string();
4875 assert!(plan_str.contains("TableScan: phy"), "{plan}");
4876 assert!(plan_str.contains("SubqueryAlias: some_metric"));
4877 assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4878 assert!(!plan_str.contains("TableScan: some_metric"));
4879 }
4880
4881 #[tokio::test]
4882 async fn sum_without_does_not_group_by_tsid() {
4883 let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4884 let eval_stmt = EvalStmt {
4885 expr: prom_expr,
4886 start: UNIX_EPOCH,
4887 end: UNIX_EPOCH
4888 .checked_add(Duration::from_secs(100_000))
4889 .unwrap(),
4890 interval: Duration::from_secs(5),
4891 lookback_delta: Duration::from_secs(1),
4892 };
4893
4894 let table_provider = build_test_table_provider_with_tsid(
4895 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4896 1,
4897 1,
4898 )
4899 .await;
4900 let plan =
4901 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4902 .await
4903 .unwrap();
4904
4905 let plan_str = plan.display_indent_schema().to_string();
4906 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4907
4908 let aggr_line = plan_str
4909 .lines()
4910 .find(|line| line.contains("Aggregate: groupBy="))
4911 .unwrap();
4912 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4913 }
4914
4915 #[tokio::test]
4916 async fn topk_without_does_not_partition_by_tsid() {
4917 let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4918 let eval_stmt = EvalStmt {
4919 expr: prom_expr,
4920 start: UNIX_EPOCH,
4921 end: UNIX_EPOCH
4922 .checked_add(Duration::from_secs(100_000))
4923 .unwrap(),
4924 interval: Duration::from_secs(5),
4925 lookback_delta: Duration::from_secs(1),
4926 };
4927
4928 let table_provider = build_test_table_provider_with_tsid(
4929 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4930 1,
4931 1,
4932 )
4933 .await;
4934 let plan =
4935 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4936 .await
4937 .unwrap();
4938
4939 let plan_str = plan.display_indent_schema().to_string();
4940 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4941
4942 let window_line = plan_str
4943 .lines()
4944 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4945 .unwrap();
4946 let partition_by = window_line
4947 .split("PARTITION BY [")
4948 .nth(1)
4949 .and_then(|s| s.split("] ORDER BY").next())
4950 .unwrap();
4951 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4952 }
4953
4954 #[tokio::test]
4955 async fn sum_by_does_not_group_by_tsid() {
4956 let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4957 let eval_stmt = EvalStmt {
4958 expr: prom_expr,
4959 start: UNIX_EPOCH,
4960 end: UNIX_EPOCH
4961 .checked_add(Duration::from_secs(100_000))
4962 .unwrap(),
4963 interval: Duration::from_secs(5),
4964 lookback_delta: Duration::from_secs(1),
4965 };
4966
4967 let table_provider = build_test_table_provider_with_tsid(
4968 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4969 1,
4970 1,
4971 )
4972 .await;
4973 let plan =
4974 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4975 .await
4976 .unwrap();
4977
4978 let plan_str = plan.display_indent_schema().to_string();
4979 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4980
4981 let aggr_line = plan_str
4982 .lines()
4983 .find(|line| line.contains("Aggregate: groupBy="))
4984 .unwrap();
4985 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4986 }
4987
4988 #[tokio::test]
4989 async fn topk_by_does_not_partition_by_tsid() {
4990 let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4991 let eval_stmt = EvalStmt {
4992 expr: prom_expr,
4993 start: UNIX_EPOCH,
4994 end: UNIX_EPOCH
4995 .checked_add(Duration::from_secs(100_000))
4996 .unwrap(),
4997 interval: Duration::from_secs(5),
4998 lookback_delta: Duration::from_secs(1),
4999 };
5000
5001 let table_provider = build_test_table_provider_with_tsid(
5002 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5003 1,
5004 1,
5005 )
5006 .await;
5007 let plan =
5008 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5009 .await
5010 .unwrap();
5011
5012 let plan_str = plan.display_indent_schema().to_string();
5013 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5014
5015 let window_line = plan_str
5016 .lines()
5017 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5018 .unwrap();
5019 let partition_by = window_line
5020 .split("PARTITION BY [")
5021 .nth(1)
5022 .and_then(|s| s.split("] ORDER BY").next())
5023 .unwrap();
5024 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5025 }
5026
5027 #[tokio::test]
5028 async fn selector_matcher_on_tsid_does_not_use_internal_column() {
5029 let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
5030 let eval_stmt = EvalStmt {
5031 expr: prom_expr,
5032 start: UNIX_EPOCH,
5033 end: UNIX_EPOCH
5034 .checked_add(Duration::from_secs(100_000))
5035 .unwrap(),
5036 interval: Duration::from_secs(5),
5037 lookback_delta: Duration::from_secs(1),
5038 };
5039
5040 let table_provider = build_test_table_provider_with_tsid(
5041 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5042 1,
5043 1,
5044 )
5045 .await;
5046 let plan =
5047 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5048 .await
5049 .unwrap();
5050
5051 fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
5052 if let LogicalPlan::Filter(filter) = plan {
5053 datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
5054 }
5055 for input in plan.inputs() {
5056 collect_filter_cols(input, out);
5057 }
5058 }
5059
5060 let mut filter_cols = HashSet::new();
5061 collect_filter_cols(&plan, &mut filter_cols);
5062 assert!(
5063 !filter_cols
5064 .iter()
5065 .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
5066 );
5067 }
5068
5069 #[tokio::test]
5070 async fn tsid_is_not_used_when_physical_table_is_missing() {
5071 let prom_expr = parser::parse("some_metric").unwrap();
5072 let eval_stmt = EvalStmt {
5073 expr: prom_expr,
5074 start: UNIX_EPOCH,
5075 end: UNIX_EPOCH
5076 .checked_add(Duration::from_secs(100_000))
5077 .unwrap(),
5078 interval: Duration::from_secs(5),
5079 lookback_delta: Duration::from_secs(1),
5080 };
5081
5082 let catalog_list = MemoryCatalogManager::with_default_setup();
5083
5084 let mut columns = vec![ColumnSchema::new(
5086 "tag_0".to_string(),
5087 ConcreteDataType::string_datatype(),
5088 false,
5089 )];
5090 columns.push(
5091 ColumnSchema::new(
5092 "timestamp".to_string(),
5093 ConcreteDataType::timestamp_millisecond_datatype(),
5094 false,
5095 )
5096 .with_time_index(true),
5097 );
5098 columns.push(ColumnSchema::new(
5099 "field_0".to_string(),
5100 ConcreteDataType::float64_datatype(),
5101 true,
5102 ));
5103 let schema = Arc::new(Schema::new(columns));
5104 let mut options = table::requests::TableOptions::default();
5105 options
5106 .extra_options
5107 .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
5108 let table_meta = TableMetaBuilder::empty()
5109 .schema(schema)
5110 .primary_key_indices(vec![0])
5111 .value_indices(vec![2])
5112 .engine(METRIC_ENGINE_NAME.to_string())
5113 .options(options)
5114 .next_column_id(1024)
5115 .build()
5116 .unwrap();
5117 let table_info = TableInfoBuilder::default()
5118 .table_id(1024)
5119 .name("some_metric")
5120 .meta(table_meta)
5121 .build()
5122 .unwrap();
5123 let table = EmptyTable::from_table_info(&table_info);
5124 catalog_list
5125 .register_table_sync(RegisterTableRequest {
5126 catalog: DEFAULT_CATALOG_NAME.to_string(),
5127 schema: DEFAULT_SCHEMA_NAME.to_string(),
5128 table_name: "some_metric".to_string(),
5129 table_id: 1024,
5130 table,
5131 })
5132 .unwrap();
5133
5134 let table_provider = DfTableSourceProvider::new(
5135 catalog_list,
5136 false,
5137 QueryContext::arc(),
5138 DummyDecoder::arc(),
5139 false,
5140 );
5141
5142 let plan =
5143 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5144 .await
5145 .unwrap();
5146
5147 let plan_str = plan.display_indent_schema().to_string();
5148 assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
5149 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5150 }
5151
5152 #[tokio::test]
5153 async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
5154 let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
5155 let eval_stmt = EvalStmt {
5156 expr: prom_expr,
5157 start: UNIX_EPOCH,
5158 end: UNIX_EPOCH
5159 .checked_add(Duration::from_secs(100_000))
5160 .unwrap(),
5161 interval: Duration::from_secs(5),
5162 lookback_delta: Duration::from_secs(1),
5163 };
5164
5165 let table_provider = build_test_table_provider_with_tsid(
5166 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5167 1,
5168 1,
5169 )
5170 .await;
5171 let plan =
5172 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5173 .await
5174 .unwrap();
5175
5176 let plan_str = plan.display_indent_schema().to_string();
5177 assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
5178 assert!(
5179 !plan
5180 .schema()
5181 .fields()
5182 .iter()
5183 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5184 );
5185
5186 let prom_expr = parser::parse("sum(some_metric)").unwrap();
5188 let eval_stmt = EvalStmt {
5189 expr: prom_expr,
5190 start: UNIX_EPOCH,
5191 end: UNIX_EPOCH
5192 .checked_add(Duration::from_secs(100_000))
5193 .unwrap(),
5194 interval: Duration::from_secs(5),
5195 lookback_delta: Duration::from_secs(1),
5196 };
5197 let table_provider = build_test_table_provider_with_tsid(
5198 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5199 1,
5200 1,
5201 )
5202 .await;
5203 let plan =
5204 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5205 .await
5206 .unwrap();
5207 let plan_str = plan.display_indent_schema().to_string();
5208 assert!(!plan_str.contains("first_value"));
5209 }
5210
5211 #[tokio::test]
5212 async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5213 let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5214 let eval_stmt = EvalStmt {
5215 expr: prom_expr,
5216 start: UNIX_EPOCH,
5217 end: UNIX_EPOCH
5218 .checked_add(Duration::from_secs(100_000))
5219 .unwrap(),
5220 interval: Duration::from_secs(5),
5221 lookback_delta: Duration::from_secs(1),
5222 };
5223
5224 let table_provider = build_test_table_provider_with_tsid(
5225 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5226 1,
5227 1,
5228 )
5229 .await;
5230
5231 let plan =
5232 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5233 .await
5234 .unwrap();
5235
5236 assert!(
5237 !plan
5238 .schema()
5239 .fields()
5240 .iter()
5241 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5242 );
5243 }
5244
5245 #[tokio::test]
5246 async fn aggregate_avg() {
5247 do_aggregate_expr_plan("avg", "avg").await;
5248 }
5249
5250 #[tokio::test]
5251 #[should_panic] async fn aggregate_count() {
5253 do_aggregate_expr_plan("count", "count").await;
5254 }
5255
5256 #[tokio::test]
5257 async fn aggregate_min() {
5258 do_aggregate_expr_plan("min", "min").await;
5259 }
5260
5261 #[tokio::test]
5262 async fn aggregate_max() {
5263 do_aggregate_expr_plan("max", "max").await;
5264 }
5265
5266 #[tokio::test]
5267 async fn aggregate_group() {
5268 let prom_expr = parser::parse(
5272 "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5273 )
5274 .unwrap();
5275 let eval_stmt = EvalStmt {
5276 expr: prom_expr,
5277 start: UNIX_EPOCH,
5278 end: UNIX_EPOCH
5279 .checked_add(Duration::from_secs(100_000))
5280 .unwrap(),
5281 interval: Duration::from_secs(5),
5282 lookback_delta: Duration::from_secs(1),
5283 };
5284
5285 let table_provider = build_test_table_provider_with_fields(
5286 &[(
5287 DEFAULT_SCHEMA_NAME.to_string(),
5288 "kubernetes_build_info".to_string(),
5289 )],
5290 &["cluster", "service", "job"],
5291 )
5292 .await;
5293 let plan =
5294 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5295 .await
5296 .unwrap();
5297
5298 let plan_str = plan.display_indent_schema().to_string();
5299 assert!(plan_str.contains("max(Float64(1"));
5300 }
5301
5302 #[tokio::test]
5303 async fn aggregate_stddev() {
5304 do_aggregate_expr_plan("stddev", "stddev_pop").await;
5305 }
5306
5307 #[tokio::test]
5308 async fn aggregate_stdvar() {
5309 do_aggregate_expr_plan("stdvar", "var_pop").await;
5310 }
5311
5312 #[tokio::test]
5336 async fn binary_op_column_column() {
5337 let prom_expr =
5338 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5339 let eval_stmt = EvalStmt {
5340 expr: prom_expr,
5341 start: UNIX_EPOCH,
5342 end: UNIX_EPOCH
5343 .checked_add(Duration::from_secs(100_000))
5344 .unwrap(),
5345 interval: Duration::from_secs(5),
5346 lookback_delta: Duration::from_secs(1),
5347 };
5348
5349 let table_provider = build_test_table_provider(
5350 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5351 1,
5352 1,
5353 )
5354 .await;
5355 let plan =
5356 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5357 .await
5358 .unwrap();
5359
5360 let expected = String::from(
5361 "Projection: rhs.tag_0, rhs.timestamp, CAST(lhs.field_0 AS Float64) + CAST(rhs.field_0 AS Float64) AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
5362 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5363 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5364 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5365 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5366 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5367 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5368 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5369 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5370 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5371 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5372 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5373 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5374 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5375 );
5376
5377 assert_eq!(plan.display_indent_schema().to_string(), expected);
5378 }
5379
5380 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5381 let prom_expr = parser::parse(query).unwrap();
5382 let eval_stmt = EvalStmt {
5383 expr: prom_expr,
5384 start: UNIX_EPOCH,
5385 end: UNIX_EPOCH
5386 .checked_add(Duration::from_secs(100_000))
5387 .unwrap(),
5388 interval: Duration::from_secs(5),
5389 lookback_delta: Duration::from_secs(1),
5390 };
5391
5392 let table_provider = build_test_table_provider(
5393 &[
5394 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5395 (
5396 "greptime_private".to_string(),
5397 "some_alt_metric".to_string(),
5398 ),
5399 ],
5400 1,
5401 1,
5402 )
5403 .await;
5404 let plan =
5405 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5406 .await
5407 .unwrap();
5408
5409 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5410 }
5411
5412 #[tokio::test]
5413 async fn binary_op_literal_column() {
5414 let query = r#"1 + some_metric{tag_0="bar"}"#;
5415 let expected = String::from(
5416 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + CAST(some_metric.field_0 AS Float64) AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
5417 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5418 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5419 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5420 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5421 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5422 );
5423
5424 indie_query_plan_compare(query, expected).await;
5425 }
5426
5427 #[tokio::test]
5428 async fn binary_op_literal_literal() {
5429 let query = r#"1 + 1"#;
5430 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(ms), value:Float64;N]
5431 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
5432 indie_query_plan_compare(query, expected).await;
5433 }
5434
5435 #[tokio::test]
5436 async fn simple_bool_grammar() {
5437 let query = "some_metric != bool 1.2345";
5438 let expected = String::from(
5439 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 != Float64(1.2345):Float64;N]\
5440 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5441 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5442 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5443 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5444 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5445 );
5446
5447 indie_query_plan_compare(query, expected).await;
5448 }
5449
5450 #[tokio::test]
5451 async fn bool_with_additional_arithmetic() {
5452 let query = "some_metric + (1 == bool 2)";
5453 let expected = String::from(
5454 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 AS Float64) + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
5455 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5456 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5457 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5458 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5459 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5460 );
5461
5462 indie_query_plan_compare(query, expected).await;
5463 }
5464
5465 #[tokio::test]
5466 async fn simple_unary() {
5467 let query = "-some_metric";
5468 let expected = String::from(
5469 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(ms), (- field_0):Float64;N]\
5470 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5471 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5472 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5473 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5474 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5475 );
5476
5477 indie_query_plan_compare(query, expected).await;
5478 }
5479
5480 #[tokio::test]
5481 async fn increase_aggr() {
5482 let query = "increase(some_metric[5m])";
5483 let expected = String::from(
5484 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5485 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5486 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5487 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5488 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5489 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5490 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5491 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5492 );
5493
5494 indie_query_plan_compare(query, expected).await;
5495 }
5496
5497 #[tokio::test]
5498 async fn less_filter_on_value() {
5499 let query = "some_metric < 1.2345";
5500 let expected = String::from(
5501 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5502 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5503 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5504 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5505 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5506 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5507 );
5508
5509 indie_query_plan_compare(query, expected).await;
5510 }
5511
5512 #[tokio::test]
5513 async fn count_over_time() {
5514 let query = "count_over_time(some_metric[5m])";
5515 let expected = String::from(
5516 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5517 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5518 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5519 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5520 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5521 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5522 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5523 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5524 );
5525
5526 indie_query_plan_compare(query, expected).await;
5527 }
5528
5529 #[tokio::test]
5530 async fn test_hash_join() {
5531 let mut eval_stmt = EvalStmt {
5532 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5533 start: UNIX_EPOCH,
5534 end: UNIX_EPOCH
5535 .checked_add(Duration::from_secs(100_000))
5536 .unwrap(),
5537 interval: Duration::from_secs(5),
5538 lookback_delta: Duration::from_secs(1),
5539 };
5540
5541 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5542
5543 let prom_expr = parser::parse(case).unwrap();
5544 eval_stmt.expr = prom_expr;
5545 let table_provider = build_test_table_provider_with_fields(
5546 &[
5547 (
5548 DEFAULT_SCHEMA_NAME.to_string(),
5549 "http_server_requests_seconds_sum".to_string(),
5550 ),
5551 (
5552 DEFAULT_SCHEMA_NAME.to_string(),
5553 "http_server_requests_seconds_count".to_string(),
5554 ),
5555 ],
5556 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5557 )
5558 .await;
5559 let plan =
5561 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5562 .await
5563 .unwrap();
5564 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, CAST(http_server_requests_seconds_sum.greptime_value AS Float64) / CAST(http_server_requests_seconds_count.greptime_value AS Float64) AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5565 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5566 \n SubqueryAlias: http_server_requests_seconds_sum\
5567 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5568 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5569 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5570 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5571 \n TableScan: http_server_requests_seconds_sum\
5572 \n SubqueryAlias: http_server_requests_seconds_count\
5573 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5574 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5575 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5576 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5577 \n TableScan: http_server_requests_seconds_count";
5578 assert_eq!(plan.to_string(), expected);
5579 }
5580
5581 #[tokio::test]
5582 async fn test_nested_histogram_quantile() {
5583 let mut eval_stmt = EvalStmt {
5584 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5585 start: UNIX_EPOCH,
5586 end: UNIX_EPOCH
5587 .checked_add(Duration::from_secs(100_000))
5588 .unwrap(),
5589 interval: Duration::from_secs(5),
5590 lookback_delta: Duration::from_secs(1),
5591 };
5592
5593 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5594
5595 let prom_expr = parser::parse(case).unwrap();
5596 eval_stmt.expr = prom_expr;
5597 let table_provider = build_test_table_provider_with_fields(
5598 &[(
5599 DEFAULT_SCHEMA_NAME.to_string(),
5600 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5601 )],
5602 &["pod", "le", "path", "code", "container"],
5603 )
5604 .await;
5605 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5607 .await
5608 .unwrap();
5609 }
5610
5611 #[tokio::test]
5612 async fn test_parse_and_operator() {
5613 let mut eval_stmt = EvalStmt {
5614 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5615 start: UNIX_EPOCH,
5616 end: UNIX_EPOCH
5617 .checked_add(Duration::from_secs(100_000))
5618 .unwrap(),
5619 interval: Duration::from_secs(5),
5620 lookback_delta: Duration::from_secs(1),
5621 };
5622
5623 let cases = [
5624 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5625 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5626 ];
5627
5628 for case in cases {
5629 let prom_expr = parser::parse(case).unwrap();
5630 eval_stmt.expr = prom_expr;
5631 let table_provider = build_test_table_provider_with_fields(
5632 &[
5633 (
5634 DEFAULT_SCHEMA_NAME.to_string(),
5635 "kubelet_volume_stats_used_bytes".to_string(),
5636 ),
5637 (
5638 DEFAULT_SCHEMA_NAME.to_string(),
5639 "kubelet_volume_stats_capacity_bytes".to_string(),
5640 ),
5641 ],
5642 &["namespace", "persistentvolumeclaim"],
5643 )
5644 .await;
5645 let _ =
5647 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5648 .await
5649 .unwrap();
5650 }
5651 }
5652
5653 #[tokio::test]
5654 async fn test_nested_binary_op() {
5655 let mut eval_stmt = EvalStmt {
5656 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5657 start: UNIX_EPOCH,
5658 end: UNIX_EPOCH
5659 .checked_add(Duration::from_secs(100_000))
5660 .unwrap(),
5661 interval: Duration::from_secs(5),
5662 lookback_delta: Duration::from_secs(1),
5663 };
5664
5665 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5666 (
5667 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5668 or
5669 vector(0)
5670 )"#;
5671
5672 let prom_expr = parser::parse(case).unwrap();
5673 eval_stmt.expr = prom_expr;
5674 let table_provider = build_test_table_provider_with_fields(
5675 &[(
5676 DEFAULT_SCHEMA_NAME.to_string(),
5677 "nginx_ingress_controller_requests".to_string(),
5678 )],
5679 &["namespace", "job"],
5680 )
5681 .await;
5682 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5684 .await
5685 .unwrap();
5686 }
5687
5688 #[tokio::test]
5689 async fn test_parse_or_operator() {
5690 let mut eval_stmt = EvalStmt {
5691 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5692 start: UNIX_EPOCH,
5693 end: UNIX_EPOCH
5694 .checked_add(Duration::from_secs(100_000))
5695 .unwrap(),
5696 interval: Duration::from_secs(5),
5697 lookback_delta: Duration::from_secs(1),
5698 };
5699
5700 let case = r#"
5701 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5702 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5703 or
5704 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5705 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5706
5707 let table_provider = build_test_table_provider_with_fields(
5708 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5709 &["tenant_name", "cluster_name"],
5710 )
5711 .await;
5712 eval_stmt.expr = parser::parse(case).unwrap();
5713 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5714 .await
5715 .unwrap();
5716
5717 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5718 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5719 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5720 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5721 or
5722 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5723 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5724 or
5725 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5726 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5727 let table_provider = build_test_table_provider_with_fields(
5728 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5729 &["tenant_name", "cluster_name"],
5730 )
5731 .await;
5732 eval_stmt.expr = parser::parse(case).unwrap();
5733 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5734 .await
5735 .unwrap();
5736
5737 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5738 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5739 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5740 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5741 let table_provider = build_test_table_provider_with_fields(
5742 &[
5743 (
5744 DEFAULT_SCHEMA_NAME.to_string(),
5745 "background_waitevent_cnt".to_string(),
5746 ),
5747 (
5748 DEFAULT_SCHEMA_NAME.to_string(),
5749 "foreground_waitevent_cnt".to_string(),
5750 ),
5751 ],
5752 &["tenant_name", "cluster_name"],
5753 )
5754 .await;
5755 eval_stmt.expr = parser::parse(case).unwrap();
5756 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5757 .await
5758 .unwrap();
5759
5760 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5761 let table_provider = build_test_table_provider_with_fields(
5762 &[
5763 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5764 (
5765 DEFAULT_SCHEMA_NAME.to_string(),
5766 "container_cpu_load_average_10s".to_string(),
5767 ),
5768 (
5769 DEFAULT_SCHEMA_NAME.to_string(),
5770 "container_spec_cpu_quota".to_string(),
5771 ),
5772 ],
5773 &["cluster_name", "host_name"],
5774 )
5775 .await;
5776 eval_stmt.expr = parser::parse(case).unwrap();
5777 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5778 .await
5779 .unwrap();
5780 }
5781
5782 #[tokio::test]
5783 async fn value_matcher() {
5784 let mut eval_stmt = EvalStmt {
5786 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5787 start: UNIX_EPOCH,
5788 end: UNIX_EPOCH
5789 .checked_add(Duration::from_secs(100_000))
5790 .unwrap(),
5791 interval: Duration::from_secs(5),
5792 lookback_delta: Duration::from_secs(1),
5793 };
5794
5795 let cases = [
5796 (
5798 r#"some_metric{__field__="field_1"}"#,
5799 vec![
5800 "some_metric.field_1",
5801 "some_metric.tag_0",
5802 "some_metric.tag_1",
5803 "some_metric.tag_2",
5804 "some_metric.timestamp",
5805 ],
5806 ),
5807 (
5809 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5810 vec![
5811 "some_metric.field_0",
5812 "some_metric.field_1",
5813 "some_metric.tag_0",
5814 "some_metric.tag_1",
5815 "some_metric.tag_2",
5816 "some_metric.timestamp",
5817 ],
5818 ),
5819 (
5821 r#"some_metric{__field__!="field_1"}"#,
5822 vec![
5823 "some_metric.field_0",
5824 "some_metric.field_2",
5825 "some_metric.tag_0",
5826 "some_metric.tag_1",
5827 "some_metric.tag_2",
5828 "some_metric.timestamp",
5829 ],
5830 ),
5831 (
5833 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5834 vec![
5835 "some_metric.field_0",
5836 "some_metric.tag_0",
5837 "some_metric.tag_1",
5838 "some_metric.tag_2",
5839 "some_metric.timestamp",
5840 ],
5841 ),
5842 (
5844 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5845 vec![
5846 "some_metric.field_1",
5847 "some_metric.tag_0",
5848 "some_metric.tag_1",
5849 "some_metric.tag_2",
5850 "some_metric.timestamp",
5851 ],
5852 ),
5853 (
5855 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5856 vec![
5857 "some_metric.tag_0",
5858 "some_metric.tag_1",
5859 "some_metric.tag_2",
5860 "some_metric.timestamp",
5861 ],
5862 ),
5863 (
5865 r#"some_metric{__field__=~"field_1|field_2"}"#,
5866 vec![
5867 "some_metric.field_1",
5868 "some_metric.field_2",
5869 "some_metric.tag_0",
5870 "some_metric.tag_1",
5871 "some_metric.tag_2",
5872 "some_metric.timestamp",
5873 ],
5874 ),
5875 (
5877 r#"some_metric{__field__!~"field_1|field_2"}"#,
5878 vec![
5879 "some_metric.field_0",
5880 "some_metric.tag_0",
5881 "some_metric.tag_1",
5882 "some_metric.tag_2",
5883 "some_metric.timestamp",
5884 ],
5885 ),
5886 ];
5887
5888 for case in cases {
5889 let prom_expr = parser::parse(case.0).unwrap();
5890 eval_stmt.expr = prom_expr;
5891 let table_provider = build_test_table_provider(
5892 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5893 3,
5894 3,
5895 )
5896 .await;
5897 let plan =
5898 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5899 .await
5900 .unwrap();
5901 let mut fields = plan.schema().field_names();
5902 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5903 fields.sort();
5904 expected.sort();
5905 assert_eq!(fields, expected, "case: {:?}", case.0);
5906 }
5907
5908 let bad_cases = [
5909 r#"some_metric{__field__="nonexistent"}"#,
5910 r#"some_metric{__field__!="nonexistent"}"#,
5911 ];
5912
5913 for case in bad_cases {
5914 let prom_expr = parser::parse(case).unwrap();
5915 eval_stmt.expr = prom_expr;
5916 let table_provider = build_test_table_provider(
5917 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5918 3,
5919 3,
5920 )
5921 .await;
5922 let plan =
5923 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5924 .await;
5925 assert!(plan.is_err(), "case: {:?}", case);
5926 }
5927 }
5928
5929 #[tokio::test]
5930 async fn custom_schema() {
5931 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5932 let expected = String::from(
5933 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5934 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5935 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5936 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5937 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5938 );
5939
5940 indie_query_plan_compare(query, expected).await;
5941
5942 let query = "some_alt_metric{__database__=\"greptime_private\"}";
5943 let expected = String::from(
5944 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5945 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5946 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5947 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5948 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5949 );
5950
5951 indie_query_plan_compare(query, expected).await;
5952
5953 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5954 let expected = String::from(
5955 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(greptime_private.some_alt_metric.field_0 AS Float64) / CAST(some_metric.field_0 AS Float64) AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5956 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5957 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5958 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5959 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5960 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5961 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5962 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5963 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5964 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5965 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5966 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5967 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5968 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5969 );
5970
5971 indie_query_plan_compare(query, expected).await;
5972 }
5973
5974 #[tokio::test]
5975 async fn only_equals_is_supported_for_special_matcher() {
5976 let queries = &[
5977 "some_alt_metric{__schema__!=\"greptime_private\"}",
5978 "some_alt_metric{__schema__=~\"lalala\"}",
5979 "some_alt_metric{__database__!=\"greptime_private\"}",
5980 "some_alt_metric{__database__=~\"lalala\"}",
5981 ];
5982
5983 for query in queries {
5984 let prom_expr = parser::parse(query).unwrap();
5985 let eval_stmt = EvalStmt {
5986 expr: prom_expr,
5987 start: UNIX_EPOCH,
5988 end: UNIX_EPOCH
5989 .checked_add(Duration::from_secs(100_000))
5990 .unwrap(),
5991 interval: Duration::from_secs(5),
5992 lookback_delta: Duration::from_secs(1),
5993 };
5994
5995 let table_provider = build_test_table_provider(
5996 &[
5997 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5998 (
5999 "greptime_private".to_string(),
6000 "some_alt_metric".to_string(),
6001 ),
6002 ],
6003 1,
6004 1,
6005 )
6006 .await;
6007
6008 let plan =
6009 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6010 .await;
6011 assert!(plan.is_err(), "query: {:?}", query);
6012 }
6013 }
6014
6015 #[tokio::test]
6016 async fn test_non_ms_precision() {
6017 let catalog_list = MemoryCatalogManager::with_default_setup();
6018 let columns = vec![
6019 ColumnSchema::new(
6020 "tag".to_string(),
6021 ConcreteDataType::string_datatype(),
6022 false,
6023 ),
6024 ColumnSchema::new(
6025 "timestamp".to_string(),
6026 ConcreteDataType::timestamp_nanosecond_datatype(),
6027 false,
6028 )
6029 .with_time_index(true),
6030 ColumnSchema::new(
6031 "field".to_string(),
6032 ConcreteDataType::float64_datatype(),
6033 true,
6034 ),
6035 ];
6036 let schema = Arc::new(Schema::new(columns));
6037 let table_meta = TableMetaBuilder::empty()
6038 .schema(schema)
6039 .primary_key_indices(vec![0])
6040 .value_indices(vec![2])
6041 .next_column_id(1024)
6042 .build()
6043 .unwrap();
6044 let table_info = TableInfoBuilder::default()
6045 .name("metrics".to_string())
6046 .meta(table_meta)
6047 .build()
6048 .unwrap();
6049 let table = EmptyTable::from_table_info(&table_info);
6050 assert!(
6051 catalog_list
6052 .register_table_sync(RegisterTableRequest {
6053 catalog: DEFAULT_CATALOG_NAME.to_string(),
6054 schema: DEFAULT_SCHEMA_NAME.to_string(),
6055 table_name: "metrics".to_string(),
6056 table_id: 1024,
6057 table,
6058 })
6059 .is_ok()
6060 );
6061
6062 let plan = PromPlanner::stmt_to_plan(
6063 DfTableSourceProvider::new(
6064 catalog_list.clone(),
6065 false,
6066 QueryContext::arc(),
6067 DummyDecoder::arc(),
6068 true,
6069 ),
6070 &EvalStmt {
6071 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
6072 start: UNIX_EPOCH,
6073 end: UNIX_EPOCH
6074 .checked_add(Duration::from_secs(100_000))
6075 .unwrap(),
6076 interval: Duration::from_secs(5),
6077 lookback_delta: Duration::from_secs(1),
6078 },
6079 &build_query_engine_state(),
6080 )
6081 .await
6082 .unwrap();
6083 assert_eq!(
6084 plan.display_indent_schema().to_string(),
6085 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6086 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6087 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6088 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6089 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6090 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6091 );
6092 let plan = PromPlanner::stmt_to_plan(
6093 DfTableSourceProvider::new(
6094 catalog_list.clone(),
6095 false,
6096 QueryContext::arc(),
6097 DummyDecoder::arc(),
6098 true,
6099 ),
6100 &EvalStmt {
6101 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
6102 start: UNIX_EPOCH,
6103 end: UNIX_EPOCH
6104 .checked_add(Duration::from_secs(100_000))
6105 .unwrap(),
6106 interval: Duration::from_secs(5),
6107 lookback_delta: Duration::from_secs(1),
6108 },
6109 &build_query_engine_state(),
6110 )
6111 .await
6112 .unwrap();
6113 assert_eq!(
6114 plan.display_indent_schema().to_string(),
6115 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6116 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6117 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(ms), timestamp_range:Dictionary(Int64, Timestamp(ms))]\
6118 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6119 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6120 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6121 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6122 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6123 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6124 );
6125 }
6126
6127 #[tokio::test]
6128 async fn test_nonexistent_label() {
6129 let mut eval_stmt = EvalStmt {
6131 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6132 start: UNIX_EPOCH,
6133 end: UNIX_EPOCH
6134 .checked_add(Duration::from_secs(100_000))
6135 .unwrap(),
6136 interval: Duration::from_secs(5),
6137 lookback_delta: Duration::from_secs(1),
6138 };
6139
6140 let case = r#"some_metric{nonexistent="hi"}"#;
6141 let prom_expr = parser::parse(case).unwrap();
6142 eval_stmt.expr = prom_expr;
6143 let table_provider = build_test_table_provider(
6144 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6145 3,
6146 3,
6147 )
6148 .await;
6149 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6151 .await
6152 .unwrap();
6153 }
6154
6155 #[tokio::test]
6156 async fn test_label_join() {
6157 let prom_expr = parser::parse(
6158 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
6159 )
6160 .unwrap();
6161 let eval_stmt = EvalStmt {
6162 expr: prom_expr,
6163 start: UNIX_EPOCH,
6164 end: UNIX_EPOCH
6165 .checked_add(Duration::from_secs(100_000))
6166 .unwrap(),
6167 interval: Duration::from_secs(5),
6168 lookback_delta: Duration::from_secs(1),
6169 };
6170
6171 let table_provider =
6172 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
6173 .await;
6174 let plan =
6175 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6176 .await
6177 .unwrap();
6178
6179 let expected = r#"
6180Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6181 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6182 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6183 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6184 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6185 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6186 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6187
6188 let ret = plan.display_indent_schema().to_string();
6189 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6190 }
6191
6192 #[tokio::test]
6193 async fn test_label_replace() {
6194 let prom_expr = parser::parse(
6195 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6196 )
6197 .unwrap();
6198 let eval_stmt = EvalStmt {
6199 expr: prom_expr,
6200 start: UNIX_EPOCH,
6201 end: UNIX_EPOCH
6202 .checked_add(Duration::from_secs(100_000))
6203 .unwrap(),
6204 interval: Duration::from_secs(5),
6205 lookback_delta: Duration::from_secs(1),
6206 };
6207
6208 let table_provider =
6209 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6210 .await;
6211 let plan =
6212 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6213 .await
6214 .unwrap();
6215
6216 let expected = r#"
6217Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6218 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6219 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6220 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6221 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6222 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6223 TableScan: up [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6224
6225 let ret = plan.display_indent_schema().to_string();
6226 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6227 }
6228
6229 #[tokio::test]
6230 async fn test_matchers_to_expr() {
6231 let mut eval_stmt = EvalStmt {
6232 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6233 start: UNIX_EPOCH,
6234 end: UNIX_EPOCH
6235 .checked_add(Duration::from_secs(100_000))
6236 .unwrap(),
6237 interval: Duration::from_secs(5),
6238 lookback_delta: Duration::from_secs(1),
6239 };
6240 let case =
6241 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6242
6243 let prom_expr = parser::parse(case).unwrap();
6244 eval_stmt.expr = prom_expr;
6245 let table_provider = build_test_table_provider(
6246 &[(
6247 DEFAULT_SCHEMA_NAME.to_string(),
6248 "prometheus_tsdb_head_series".to_string(),
6249 )],
6250 3,
6251 3,
6252 )
6253 .await;
6254 let plan =
6255 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6256 .await
6257 .unwrap();
6258 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6259 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6260 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6261 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6262 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6263 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6264 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6265 assert_eq!(plan.display_indent_schema().to_string(), expected);
6266 }
6267
6268 #[tokio::test]
6269 async fn test_topk_expr() {
6270 let mut eval_stmt = EvalStmt {
6271 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6272 start: UNIX_EPOCH,
6273 end: UNIX_EPOCH
6274 .checked_add(Duration::from_secs(100_000))
6275 .unwrap(),
6276 interval: Duration::from_secs(5),
6277 lookback_delta: Duration::from_secs(1),
6278 };
6279 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6280
6281 let prom_expr = parser::parse(case).unwrap();
6282 eval_stmt.expr = prom_expr;
6283 let table_provider = build_test_table_provider_with_fields(
6284 &[
6285 (
6286 DEFAULT_SCHEMA_NAME.to_string(),
6287 "prometheus_tsdb_head_series".to_string(),
6288 ),
6289 (
6290 DEFAULT_SCHEMA_NAME.to_string(),
6291 "http_server_requests_seconds_count".to_string(),
6292 ),
6293 ],
6294 &["ip"],
6295 )
6296 .await;
6297
6298 let plan =
6299 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6300 .await
6301 .unwrap();
6302 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(ms)]\
6303 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6304 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6305 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6306 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6307 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6308 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6309 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6310 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6311 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6312 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6313
6314 assert_eq!(plan.display_indent_schema().to_string(), expected);
6315 }
6316
6317 #[tokio::test]
6318 async fn test_count_values_expr() {
6319 let mut eval_stmt = EvalStmt {
6320 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6321 start: UNIX_EPOCH,
6322 end: UNIX_EPOCH
6323 .checked_add(Duration::from_secs(100_000))
6324 .unwrap(),
6325 interval: Duration::from_secs(5),
6326 lookback_delta: Duration::from_secs(1),
6327 };
6328 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6329
6330 let prom_expr = parser::parse(case).unwrap();
6331 eval_stmt.expr = prom_expr;
6332 let table_provider = build_test_table_provider_with_fields(
6333 &[
6334 (
6335 DEFAULT_SCHEMA_NAME.to_string(),
6336 "prometheus_tsdb_head_series".to_string(),
6337 ),
6338 (
6339 DEFAULT_SCHEMA_NAME.to_string(),
6340 "http_server_requests_seconds_count".to_string(),
6341 ),
6342 ],
6343 &["ip"],
6344 )
6345 .await;
6346
6347 let plan =
6348 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6349 .await
6350 .unwrap();
6351 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]\
6352 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6353 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6354 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6355 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6356 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6357 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6358 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6359 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6360
6361 assert_eq!(plan.display_indent_schema().to_string(), expected);
6362 }
6363
6364 #[tokio::test]
6365 async fn test_value_alias() {
6366 let mut eval_stmt = EvalStmt {
6367 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6368 start: UNIX_EPOCH,
6369 end: UNIX_EPOCH
6370 .checked_add(Duration::from_secs(100_000))
6371 .unwrap(),
6372 interval: Duration::from_secs(5),
6373 lookback_delta: Duration::from_secs(1),
6374 };
6375 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6376
6377 let prom_expr = parser::parse(case).unwrap();
6378 eval_stmt.expr = prom_expr;
6379 eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6380 let table_provider = build_test_table_provider_with_fields(
6381 &[
6382 (
6383 DEFAULT_SCHEMA_NAME.to_string(),
6384 "prometheus_tsdb_head_series".to_string(),
6385 ),
6386 (
6387 DEFAULT_SCHEMA_NAME.to_string(),
6388 "http_server_requests_seconds_count".to_string(),
6389 ),
6390 ],
6391 &["ip"],
6392 )
6393 .await;
6394
6395 let plan =
6396 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6397 .await
6398 .unwrap();
6399 let expected = r#"
6400Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(ms)]
6401 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]
6402 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6403 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6404 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6405 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6406 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6407 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6408 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6409 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]"#;
6410 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6411 }
6412
6413 #[tokio::test]
6414 async fn test_quantile_expr() {
6415 let mut eval_stmt = EvalStmt {
6416 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6417 start: UNIX_EPOCH,
6418 end: UNIX_EPOCH
6419 .checked_add(Duration::from_secs(100_000))
6420 .unwrap(),
6421 interval: Duration::from_secs(5),
6422 lookback_delta: Duration::from_secs(1),
6423 };
6424 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6425
6426 let prom_expr = parser::parse(case).unwrap();
6427 eval_stmt.expr = prom_expr;
6428 let table_provider = build_test_table_provider_with_fields(
6429 &[
6430 (
6431 DEFAULT_SCHEMA_NAME.to_string(),
6432 "prometheus_tsdb_head_series".to_string(),
6433 ),
6434 (
6435 DEFAULT_SCHEMA_NAME.to_string(),
6436 "http_server_requests_seconds_count".to_string(),
6437 ),
6438 ],
6439 &["ip"],
6440 )
6441 .await;
6442
6443 let plan =
6444 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6445 .await
6446 .unwrap();
6447 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6448 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6449 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6450 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6451 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6452 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6453 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6454 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6455 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6456
6457 assert_eq!(plan.display_indent_schema().to_string(), expected);
6458 }
6459
6460 #[tokio::test]
6461 async fn test_or_not_exists_table_label() {
6462 let mut eval_stmt = EvalStmt {
6463 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6464 start: UNIX_EPOCH,
6465 end: UNIX_EPOCH
6466 .checked_add(Duration::from_secs(100_000))
6467 .unwrap(),
6468 interval: Duration::from_secs(5),
6469 lookback_delta: Duration::from_secs(1),
6470 };
6471 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6472
6473 let prom_expr = parser::parse(case).unwrap();
6474 eval_stmt.expr = prom_expr;
6475 let table_provider = build_test_table_provider_with_fields(
6476 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6477 &["job"],
6478 )
6479 .await;
6480
6481 let plan =
6482 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6483 .await
6484 .unwrap();
6485 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6486 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6487 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6488 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6489 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6490 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6491 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6492 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6493 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6494 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6495 SubqueryAlias: [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6496 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6497 Sort: .time ASC NULLS LAST [time:Timestamp(ms), sum(.value):Float64;N]
6498 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(ms), sum(.value):Float64;N]
6499 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(ms), value:Float64;N]
6500 TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
6501
6502 assert_eq!(plan.display_indent_schema().to_string(), expected);
6503 }
6504
6505 #[tokio::test]
6506 async fn test_histogram_quantile_missing_le_column() {
6507 let mut eval_stmt = EvalStmt {
6508 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6509 start: UNIX_EPOCH,
6510 end: UNIX_EPOCH
6511 .checked_add(Duration::from_secs(100_000))
6512 .unwrap(),
6513 interval: Duration::from_secs(5),
6514 lookback_delta: Duration::from_secs(1),
6515 };
6516
6517 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6519
6520 let prom_expr = parser::parse(case).unwrap();
6521 eval_stmt.expr = prom_expr;
6522
6523 let table_provider = build_test_table_provider_with_fields(
6525 &[(
6526 DEFAULT_SCHEMA_NAME.to_string(),
6527 "non_existent_histogram_bucket".to_string(),
6528 )],
6529 &["pod", "instance"], )
6531 .await;
6532
6533 let result =
6535 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6536 .await;
6537
6538 assert!(
6540 result.is_ok(),
6541 "Expected successful plan creation with empty result, but got error: {:?}",
6542 result.err()
6543 );
6544
6545 let plan = result.unwrap();
6547 match plan {
6548 LogicalPlan::EmptyRelation(_) => {
6549 }
6551 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6552 }
6553 }
6554}