1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51 ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
63 Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65 quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74 VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80 METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85 ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86 EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96 ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100const SPECIAL_TIME_FUNCTION: &str = "time";
102const SCALAR_FUNCTION: &str = "scalar";
104const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110const LE_COLUMN_NAME: &str = "le";
112
113static LABEL_NAME_REGEX: Lazy<Regex> =
116 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120const DEFAULT_FIELD_COLUMN: &str = "value";
122
123const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130const MAX_SCATTER_POINTS: i64 = 400;
132
133const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138 start: Millisecond,
140 end: Millisecond,
141 interval: Millisecond,
142 lookback_delta: Millisecond,
143
144 table_name: Option<String>,
146 time_index_column: Option<String>,
147 field_columns: Vec<String>,
148 tag_columns: Vec<String>,
149 use_tsid: bool,
155 field_column_matcher: Option<Vec<Matcher>>,
157 selector_matcher: Vec<Matcher>,
159 schema_name: Option<String>,
160 range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166 Self {
167 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169 interval: stmt.interval.as_millis() as _,
170 lookback_delta: stmt.lookback_delta.as_millis() as _,
171 ..Default::default()
172 }
173 }
174
175 fn reset(&mut self) {
177 self.table_name = None;
178 self.time_index_column = None;
179 self.field_columns = vec![];
180 self.tag_columns = vec![];
181 self.use_tsid = false;
182 self.field_column_matcher = None;
183 self.selector_matcher.clear();
184 self.schema_name = None;
185 self.range = None;
186 }
187
188 fn reset_table_name_and_schema(&mut self) {
190 self.table_name = Some(String::new());
191 self.schema_name = None;
192 self.use_tsid = false;
193 }
194
195 fn has_le_tag(&self) -> bool {
197 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198 }
199}
200
201pub struct PromPlanner {
202 table_provider: DfTableSourceProvider,
203 ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207 pub async fn stmt_to_plan(
208 table_provider: DfTableSourceProvider,
209 stmt: &EvalStmt,
210 query_engine_state: &QueryEngineState,
211 ) -> Result<LogicalPlan> {
212 let mut planner = Self {
213 table_provider,
214 ctx: PromPlannerContext::from_eval_stmt(stmt),
215 };
216
217 let plan = planner
218 .prom_expr_to_plan(&stmt.expr, query_engine_state)
219 .await?;
220
221 planner.strip_tsid_column(plan)
223 }
224
225 pub async fn prom_expr_to_plan(
226 &mut self,
227 prom_expr: &PromExpr,
228 query_engine_state: &QueryEngineState,
229 ) -> Result<LogicalPlan> {
230 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231 .await
232 }
233
234 #[async_recursion]
244 async fn prom_expr_to_plan_inner(
245 &mut self,
246 prom_expr: &PromExpr,
247 timestamp_fn: bool,
248 query_engine_state: &QueryEngineState,
249 ) -> Result<LogicalPlan> {
250 let res = match prom_expr {
251 PromExpr::Aggregate(expr) => {
252 self.prom_aggr_expr_to_plan(query_engine_state, expr)
253 .await?
254 }
255 PromExpr::Unary(expr) => {
256 self.prom_unary_expr_to_plan(query_engine_state, expr)
257 .await?
258 }
259 PromExpr::Binary(expr) => {
260 self.prom_binary_expr_to_plan(query_engine_state, expr)
261 .await?
262 }
263 PromExpr::Paren(ParenExpr { expr }) => {
264 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265 .await?
266 }
267 PromExpr::Subquery(expr) => {
268 self.prom_subquery_expr_to_plan(query_engine_state, expr)
269 .await?
270 }
271 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273 PromExpr::VectorSelector(selector) => {
274 self.prom_vector_selector_to_plan(selector, timestamp_fn)
275 .await?
276 }
277 PromExpr::MatrixSelector(selector) => {
278 self.prom_matrix_selector_to_plan(selector).await?
279 }
280 PromExpr::Call(expr) => {
281 self.prom_call_expr_to_plan(query_engine_state, expr)
282 .await?
283 }
284 PromExpr::Extension(expr) => {
285 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286 }
287 };
288
289 Ok(res)
290 }
291
292 async fn prom_subquery_expr_to_plan(
293 &mut self,
294 query_engine_state: &QueryEngineState,
295 subquery_expr: &SubqueryExpr,
296 ) -> Result<LogicalPlan> {
297 let SubqueryExpr {
298 expr, range, step, ..
299 } = subquery_expr;
300
301 let current_interval = self.ctx.interval;
302 if let Some(step) = step {
303 self.ctx.interval = step.as_millis() as _;
304 }
305 let current_start = self.ctx.start;
306 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308 self.ctx.interval = current_interval;
309 self.ctx.start = current_start;
310
311 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312 let range_ms = range.as_millis() as _;
313 self.ctx.range = Some(range_ms);
314
315 let manipulate = RangeManipulate::new(
316 self.ctx.start,
317 self.ctx.end,
318 self.ctx.interval,
319 range_ms,
320 self.ctx
321 .time_index_column
322 .clone()
323 .expect("time index should be set in `setup_context`"),
324 self.ctx.field_columns.clone(),
325 input,
326 )
327 .context(DataFusionPlanningSnafu)?;
328
329 Ok(LogicalPlan::Extension(Extension {
330 node: Arc::new(manipulate),
331 }))
332 }
333
334 async fn prom_aggr_expr_to_plan(
335 &mut self,
336 query_engine_state: &QueryEngineState,
337 aggr_expr: &AggregateExpr,
338 ) -> Result<LogicalPlan> {
339 let AggregateExpr {
340 op,
341 expr,
342 modifier,
343 param,
344 } = aggr_expr;
345
346 let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347 let input_has_tsid = input.schema().fields().iter().any(|field| {
348 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349 && field.data_type() == &ArrowDataType::UInt64
350 });
351
352 let required_group_tags = match modifier {
355 None => BTreeSet::new(),
356 Some(LabelModifier::Include(labels)) => labels
357 .labels
358 .iter()
359 .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360 .cloned()
361 .collect(),
362 Some(LabelModifier::Exclude(labels)) => {
363 let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364 for label in &labels.labels {
365 let _ = all_tags.remove(label);
366 }
367 all_tags
368 }
369 };
370
371 if !required_group_tags.is_empty()
372 && required_group_tags
373 .iter()
374 .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375 {
376 input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377 self.refresh_tag_columns_from_schema(input.schema());
378 }
379
380 match (*op).id() {
381 token::T_TOPK | token::T_BOTTOMK => {
382 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383 }
384 _ => {
385 let input_tag_columns = if input_has_tsid {
389 self.collect_row_key_tag_columns_from_plan(&input)?
390 .into_iter()
391 .collect::<Vec<_>>()
392 } else {
393 self.ctx.tag_columns.clone()
394 };
395 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398 let (mut aggr_exprs, prev_field_exprs) =
400 self.create_aggregate_exprs(*op, param, &input)?;
401
402 let keep_tsid = op.id() != token::T_COUNT_VALUES
403 && input_has_tsid
404 && input_tag_columns.iter().collect::<HashSet<_>>()
405 == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407 if keep_tsid {
408 aggr_exprs.push(
409 first_value(
410 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411 vec![],
412 )
413 .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414 );
415 }
416 self.ctx.use_tsid = keep_tsid;
417
418 let builder = LogicalPlanBuilder::from(input);
420 let builder = if op.id() == token::T_COUNT_VALUES {
421 let label = Self::get_param_value_as_str(*op, param)?;
422 group_exprs.extend(prev_field_exprs.clone());
425 let project_fields = self
426 .create_field_column_exprs()?
427 .into_iter()
428 .chain(self.create_tag_column_exprs()?)
429 .chain(Some(self.create_time_index_column_expr()?))
430 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432 builder
433 .aggregate(group_exprs.clone(), aggr_exprs)
434 .context(DataFusionPlanningSnafu)?
435 .project(project_fields)
436 .context(DataFusionPlanningSnafu)?
437 } else {
438 builder
439 .aggregate(group_exprs.clone(), aggr_exprs)
440 .context(DataFusionPlanningSnafu)?
441 };
442
443 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445 builder
446 .sort(sort_expr)
447 .context(DataFusionPlanningSnafu)?
448 .build()
449 .context(DataFusionPlanningSnafu)
450 }
451 }
452 }
453
454 async fn prom_topk_bottomk_to_plan(
456 &mut self,
457 aggr_expr: &AggregateExpr,
458 input: LogicalPlan,
459 ) -> Result<LogicalPlan> {
460 let AggregateExpr {
461 op,
462 param,
463 modifier,
464 ..
465 } = aggr_expr;
466
467 let input_has_tsid = input.schema().fields().iter().any(|field| {
468 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469 && field.data_type() == &ArrowDataType::UInt64
470 });
471 self.ctx.use_tsid = input_has_tsid;
472
473 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480 let rank_columns: Vec<_> = window_exprs
481 .iter()
482 .map(|expr| expr.schema_name().to_string())
483 .collect();
484
485 let filter: DfExpr = rank_columns
488 .iter()
489 .fold(None, |expr, rank| {
490 let predicate = DfExpr::BinaryExpr(BinaryExpr {
491 left: Box::new(col(rank)),
492 op: Operator::LtEq,
493 right: Box::new(val.clone()),
494 });
495
496 match expr {
497 None => Some(predicate),
498 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499 left: Box::new(expr),
500 op: Operator::Or,
501 right: Box::new(predicate),
502 })),
503 }
504 })
505 .unwrap();
506
507 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509 let mut new_group_exprs = group_exprs.clone();
510 new_group_exprs.extend(rank_columns);
512
513 let group_sort_expr = new_group_exprs
514 .into_iter()
515 .map(|expr| expr.sort(true, false));
516
517 let project_fields = self
518 .create_field_column_exprs()?
519 .into_iter()
520 .chain(self.create_tag_column_exprs()?)
521 .chain(
522 self.ctx
523 .use_tsid
524 .then_some(DfExpr::Column(Column::from_name(
525 DATA_SCHEMA_TSID_COLUMN_NAME,
526 )))
527 .into_iter(),
528 )
529 .chain(Some(self.create_time_index_column_expr()?));
530
531 LogicalPlanBuilder::from(input)
532 .window(window_exprs)
533 .context(DataFusionPlanningSnafu)?
534 .filter(filter)
535 .context(DataFusionPlanningSnafu)?
536 .sort(group_sort_expr)
537 .context(DataFusionPlanningSnafu)?
538 .project(project_fields)
539 .context(DataFusionPlanningSnafu)?
540 .build()
541 .context(DataFusionPlanningSnafu)
542 }
543
544 async fn prom_unary_expr_to_plan(
545 &mut self,
546 query_engine_state: &QueryEngineState,
547 unary_expr: &UnaryExpr,
548 ) -> Result<LogicalPlan> {
549 let UnaryExpr { expr } = unary_expr;
550 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
552 self.projection_for_each_field_column(input, |col| {
553 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
554 })
555 }
556
557 async fn prom_binary_expr_to_plan(
558 &mut self,
559 query_engine_state: &QueryEngineState,
560 binary_expr: &PromBinaryExpr,
561 ) -> Result<LogicalPlan> {
562 let PromBinaryExpr {
563 lhs,
564 rhs,
565 op,
566 modifier,
567 } = binary_expr;
568
569 let should_return_bool = if let Some(m) = modifier {
572 m.return_bool
573 } else {
574 false
575 };
576 let is_comparison_op = Self::is_token_a_comparison_op(*op);
577
578 match (
581 Self::try_build_literal_expr(lhs),
582 Self::try_build_literal_expr(rhs),
583 ) {
584 (Some(lhs), Some(rhs)) => {
585 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
586 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
587 self.ctx.reset_table_name_and_schema();
588 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
589 let mut field_expr = field_expr_builder(lhs, rhs)?;
590
591 if is_comparison_op && should_return_bool {
592 field_expr = DfExpr::Cast(Cast {
593 expr: Box::new(field_expr),
594 data_type: ArrowDataType::Float64,
595 });
596 }
597
598 Ok(LogicalPlan::Extension(Extension {
599 node: Arc::new(
600 EmptyMetric::new(
601 self.ctx.start,
602 self.ctx.end,
603 self.ctx.interval,
604 SPECIAL_TIME_FUNCTION.to_string(),
605 DEFAULT_FIELD_COLUMN.to_string(),
606 Some(field_expr),
607 )
608 .context(DataFusionPlanningSnafu)?,
609 ),
610 }))
611 }
612 (Some(mut expr), None) => {
614 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
615 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
617 expr = time_expr
618 }
619 let bin_expr_builder = |col: &String| {
620 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
621 let mut binary_expr =
622 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
623
624 if is_comparison_op && should_return_bool {
625 binary_expr = DfExpr::Cast(Cast {
626 expr: Box::new(binary_expr),
627 data_type: ArrowDataType::Float64,
628 });
629 }
630 Ok(binary_expr)
631 };
632 if is_comparison_op && !should_return_bool {
633 self.filter_on_field_column(input, bin_expr_builder)
634 } else {
635 self.projection_for_each_field_column(input, bin_expr_builder)
636 }
637 }
638 (None, Some(mut expr)) => {
640 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
641 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
643 expr = time_expr
644 }
645 let bin_expr_builder = |col: &String| {
646 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
647 let mut binary_expr =
648 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
649
650 if is_comparison_op && should_return_bool {
651 binary_expr = DfExpr::Cast(Cast {
652 expr: Box::new(binary_expr),
653 data_type: ArrowDataType::Float64,
654 });
655 }
656 Ok(binary_expr)
657 };
658 if is_comparison_op && !should_return_bool {
659 self.filter_on_field_column(input, bin_expr_builder)
660 } else {
661 self.projection_for_each_field_column(input, bin_expr_builder)
662 }
663 }
664 (None, None) => {
666 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
667 let left_field_columns = self.ctx.field_columns.clone();
668 let left_time_index_column = self.ctx.time_index_column.clone();
669 let mut left_table_ref = self
670 .table_ref()
671 .unwrap_or_else(|_| TableReference::bare(""));
672 let left_context = self.ctx.clone();
673
674 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
675 let right_field_columns = self.ctx.field_columns.clone();
676 let right_time_index_column = self.ctx.time_index_column.clone();
677 let mut right_table_ref = self
678 .table_ref()
679 .unwrap_or_else(|_| TableReference::bare(""));
680 let right_context = self.ctx.clone();
681
682 if Self::is_token_a_set_op(*op) {
686 return self.set_op_on_non_field_columns(
687 left_input,
688 right_input,
689 left_context,
690 right_context,
691 *op,
692 modifier,
693 );
694 }
695
696 if left_table_ref == right_table_ref {
698 left_table_ref = TableReference::bare("lhs");
700 right_table_ref = TableReference::bare("rhs");
701 if self.ctx.tag_columns.is_empty() {
707 self.ctx = left_context.clone();
708 self.ctx.table_name = Some("lhs".to_string());
709 } else {
710 self.ctx.table_name = Some("rhs".to_string());
711 }
712 }
713 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
714
715 let join_plan = self.join_on_non_field_columns(
716 left_input,
717 right_input,
718 left_table_ref.clone(),
719 right_table_ref.clone(),
720 left_time_index_column,
721 right_time_index_column,
722 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
725 modifier,
726 )?;
727 let join_plan_schema = join_plan.schema().clone();
728
729 let bin_expr_builder = |_: &String| {
730 let (left_col_name, right_col_name) = field_columns.next().unwrap();
731 let left_col = join_plan_schema
732 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
733 .context(DataFusionPlanningSnafu)?
734 .into();
735 let right_col = join_plan_schema
736 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
737 .context(DataFusionPlanningSnafu)?
738 .into();
739
740 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
741 let mut binary_expr =
742 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
743 if is_comparison_op && should_return_bool {
744 binary_expr = DfExpr::Cast(Cast {
745 expr: Box::new(binary_expr),
746 data_type: ArrowDataType::Float64,
747 });
748 }
749 Ok(binary_expr)
750 };
751 if is_comparison_op && !should_return_bool {
752 let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
759 let (project_table_ref, project_context) =
760 match (lhs.value_type(), rhs.value_type()) {
761 (ValueType::Scalar, ValueType::Vector) => {
762 (&right_table_ref, &right_context)
763 }
764 _ => (&left_table_ref, &left_context),
765 };
766 self.project_binary_join_side(filtered, project_table_ref, project_context)
767 } else {
768 self.projection_for_each_field_column(join_plan, bin_expr_builder)
769 }
770 }
771 }
772 }
773
774 fn project_binary_join_side(
775 &mut self,
776 input: LogicalPlan,
777 table_ref: &TableReference,
778 context: &PromPlannerContext,
779 ) -> Result<LogicalPlan> {
780 let schema = input.schema();
781
782 let mut project_exprs =
783 Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
784
785 if let Some(time_index_column) = &context.time_index_column {
787 let time_index_col = schema
788 .qualified_field_with_name(Some(table_ref), time_index_column)
789 .context(DataFusionPlanningSnafu)?
790 .into();
791 project_exprs.push(DfExpr::Column(time_index_col));
792 }
793
794 for field_column in &context.field_columns {
796 let field_col = schema
797 .qualified_field_with_name(Some(table_ref), field_column)
798 .context(DataFusionPlanningSnafu)?
799 .into();
800 project_exprs.push(DfExpr::Column(field_col));
801 }
802
803 for tag_column in &context.tag_columns {
805 let tag_col = schema
806 .qualified_field_with_name(Some(table_ref), tag_column)
807 .context(DataFusionPlanningSnafu)?
808 .into();
809 project_exprs.push(DfExpr::Column(tag_col));
810 }
811
812 if context.use_tsid
815 && let Ok(tsid_col) =
816 schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
817 {
818 project_exprs.push(DfExpr::Column(tsid_col.into()));
819 }
820
821 let plan = LogicalPlanBuilder::from(input)
822 .project(project_exprs)
823 .context(DataFusionPlanningSnafu)?
824 .build()
825 .context(DataFusionPlanningSnafu)?;
826
827 self.ctx = context.clone();
830 self.ctx.table_name = None;
831 self.ctx.schema_name = None;
832
833 Ok(plan)
834 }
835
836 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
837 let NumberLiteral { val } = number_literal;
838 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
839 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
840 self.ctx.reset_table_name_and_schema();
841 let literal_expr = df_prelude::lit(*val);
842
843 let plan = LogicalPlan::Extension(Extension {
844 node: Arc::new(
845 EmptyMetric::new(
846 self.ctx.start,
847 self.ctx.end,
848 self.ctx.interval,
849 SPECIAL_TIME_FUNCTION.to_string(),
850 DEFAULT_FIELD_COLUMN.to_string(),
851 Some(literal_expr),
852 )
853 .context(DataFusionPlanningSnafu)?,
854 ),
855 });
856 Ok(plan)
857 }
858
859 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
860 let StringLiteral { val } = string_literal;
861 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
862 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
863 self.ctx.reset_table_name_and_schema();
864 let literal_expr = df_prelude::lit(val.clone());
865
866 let plan = LogicalPlan::Extension(Extension {
867 node: Arc::new(
868 EmptyMetric::new(
869 self.ctx.start,
870 self.ctx.end,
871 self.ctx.interval,
872 SPECIAL_TIME_FUNCTION.to_string(),
873 DEFAULT_FIELD_COLUMN.to_string(),
874 Some(literal_expr),
875 )
876 .context(DataFusionPlanningSnafu)?,
877 ),
878 });
879 Ok(plan)
880 }
881
882 async fn prom_vector_selector_to_plan(
883 &mut self,
884 vector_selector: &VectorSelector,
885 timestamp_fn: bool,
886 ) -> Result<LogicalPlan> {
887 let VectorSelector {
888 name,
889 offset,
890 matchers,
891 at: _,
892 } = vector_selector;
893 let matchers = self.preprocess_label_matchers(matchers, name)?;
894 if let Some(empty_plan) = self.setup_context().await? {
895 return Ok(empty_plan);
896 }
897 let normalize = self
898 .selector_to_series_normalize_plan(offset, matchers, false)
899 .await?;
900
901 let normalize = if timestamp_fn {
902 self.create_timestamp_func_plan(normalize)?
905 } else {
906 normalize
907 };
908
909 let manipulate = InstantManipulate::new(
910 self.ctx.start,
911 self.ctx.end,
912 self.ctx.lookback_delta,
913 self.ctx.interval,
914 self.ctx
915 .time_index_column
916 .clone()
917 .expect("time index should be set in `setup_context`"),
918 self.ctx.field_columns.first().cloned(),
919 normalize,
920 );
921 Ok(LogicalPlan::Extension(Extension {
922 node: Arc::new(manipulate),
923 }))
924 }
925
926 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
948 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
949 .alias(DEFAULT_FIELD_COLUMN);
950 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
951 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
952 project_exprs.push(self.create_time_index_column_expr()?);
953 project_exprs.push(time_expr);
954 project_exprs.extend(self.create_tag_column_exprs()?);
955
956 LogicalPlanBuilder::from(normalize)
957 .project(project_exprs)
958 .context(DataFusionPlanningSnafu)?
959 .build()
960 .context(DataFusionPlanningSnafu)
961 }
962
963 async fn prom_matrix_selector_to_plan(
964 &mut self,
965 matrix_selector: &MatrixSelector,
966 ) -> Result<LogicalPlan> {
967 let MatrixSelector { vs, range } = matrix_selector;
968 let VectorSelector {
969 name,
970 offset,
971 matchers,
972 ..
973 } = vs;
974 let matchers = self.preprocess_label_matchers(matchers, name)?;
975 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
976 let range_ms = range.as_millis() as _;
977 self.ctx.range = Some(range_ms);
978
979 let normalize = match self.setup_context().await? {
982 Some(empty_plan) => empty_plan,
983 None => {
984 self.selector_to_series_normalize_plan(offset, matchers, true)
985 .await?
986 }
987 };
988 let manipulate = RangeManipulate::new(
989 self.ctx.start,
990 self.ctx.end,
991 self.ctx.interval,
992 range_ms,
994 self.ctx
995 .time_index_column
996 .clone()
997 .expect("time index should be set in `setup_context`"),
998 self.ctx.field_columns.clone(),
999 normalize,
1000 )
1001 .context(DataFusionPlanningSnafu)?;
1002
1003 Ok(LogicalPlan::Extension(Extension {
1004 node: Arc::new(manipulate),
1005 }))
1006 }
1007
1008 async fn prom_call_expr_to_plan(
1009 &mut self,
1010 query_engine_state: &QueryEngineState,
1011 call_expr: &Call,
1012 ) -> Result<LogicalPlan> {
1013 let Call { func, args } = call_expr;
1014 match func.name {
1016 SPECIAL_HISTOGRAM_QUANTILE => {
1017 return self.create_histogram_plan(args, query_engine_state).await;
1018 }
1019 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1020 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1021 SPECIAL_ABSENT_FUNCTION => {
1022 return self.create_absent_plan(args, query_engine_state).await;
1023 }
1024 _ => {}
1025 }
1026
1027 let args = self.create_function_args(&args.args)?;
1029 let input = if let Some(prom_expr) = &args.input {
1030 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1031 .await?
1032 } else {
1033 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1034 self.ctx.reset_table_name_and_schema();
1035 self.ctx.tag_columns = vec![];
1036 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1037 LogicalPlan::Extension(Extension {
1038 node: Arc::new(
1039 EmptyMetric::new(
1040 self.ctx.start,
1041 self.ctx.end,
1042 self.ctx.interval,
1043 SPECIAL_TIME_FUNCTION.to_string(),
1044 DEFAULT_FIELD_COLUMN.to_string(),
1045 None,
1046 )
1047 .context(DataFusionPlanningSnafu)?,
1048 ),
1049 })
1050 };
1051 let (mut func_exprs, new_tags) =
1052 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1053 func_exprs.insert(0, self.create_time_index_column_expr()?);
1054 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1055
1056 let builder = LogicalPlanBuilder::from(input)
1057 .project(func_exprs)
1058 .context(DataFusionPlanningSnafu)?
1059 .filter(self.create_empty_values_filter_expr()?)
1060 .context(DataFusionPlanningSnafu)?;
1061
1062 let builder = match func.name {
1063 "sort" => builder
1064 .sort(self.create_field_columns_sort_exprs(true))
1065 .context(DataFusionPlanningSnafu)?,
1066 "sort_desc" => builder
1067 .sort(self.create_field_columns_sort_exprs(false))
1068 .context(DataFusionPlanningSnafu)?,
1069 "sort_by_label" => builder
1070 .sort(Self::create_sort_exprs_by_tags(
1071 func.name,
1072 args.literals,
1073 true,
1074 )?)
1075 .context(DataFusionPlanningSnafu)?,
1076 "sort_by_label_desc" => builder
1077 .sort(Self::create_sort_exprs_by_tags(
1078 func.name,
1079 args.literals,
1080 false,
1081 )?)
1082 .context(DataFusionPlanningSnafu)?,
1083
1084 _ => builder,
1085 };
1086
1087 for tag in new_tags {
1090 self.ctx.tag_columns.push(tag);
1091 }
1092
1093 let plan = builder.build().context(DataFusionPlanningSnafu)?;
1094 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1095
1096 Ok(plan)
1097 }
1098
1099 async fn prom_ext_expr_to_plan(
1100 &mut self,
1101 query_engine_state: &QueryEngineState,
1102 ext_expr: &promql_parser::parser::ast::Extension,
1103 ) -> Result<LogicalPlan> {
1104 let expr = &ext_expr.expr;
1106 let children = expr.children();
1107 let plan = self
1108 .prom_expr_to_plan(&children[0], query_engine_state)
1109 .await?;
1110 match expr.name() {
1116 ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1117 .explain(false, true)
1118 .unwrap()
1119 .build()
1120 .context(DataFusionPlanningSnafu),
1121 ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1122 .explain(true, true)
1123 .unwrap()
1124 .build()
1125 .context(DataFusionPlanningSnafu),
1126 EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1127 .explain(false, false)
1128 .unwrap()
1129 .build()
1130 .context(DataFusionPlanningSnafu),
1131 EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1132 .explain(true, false)
1133 .unwrap()
1134 .build()
1135 .context(DataFusionPlanningSnafu),
1136 ALIAS_NODE_NAME => {
1137 let alias = expr
1138 .as_any()
1139 .downcast_ref::<AliasExpr>()
1140 .context(UnexpectedPlanExprSnafu {
1141 desc: "Expected AliasExpr",
1142 })?
1143 .alias
1144 .clone();
1145 self.apply_alias(plan, alias)
1146 }
1147 _ => LogicalPlanBuilder::empty(true)
1148 .build()
1149 .context(DataFusionPlanningSnafu),
1150 }
1151 }
1152
1153 #[allow(clippy::mutable_key_type)]
1163 fn preprocess_label_matchers(
1164 &mut self,
1165 label_matchers: &Matchers,
1166 name: &Option<String>,
1167 ) -> Result<Matchers> {
1168 self.ctx.reset();
1169
1170 let metric_name;
1171 if let Some(name) = name.clone() {
1172 metric_name = Some(name);
1173 ensure!(
1174 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1175 MultipleMetricMatchersSnafu
1176 );
1177 } else {
1178 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1179 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1180 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1181 ensure!(
1182 matches[0].op == MatchOp::Equal,
1183 UnsupportedMatcherOpSnafu {
1184 matcher_op: matches[0].op.to_string(),
1185 matcher: METRIC_NAME
1186 }
1187 );
1188 metric_name = matches.pop().map(|m| m.value);
1189 }
1190
1191 self.ctx.table_name = metric_name;
1192
1193 let mut matchers = HashSet::new();
1194 for matcher in &label_matchers.matchers {
1195 if matcher.name == FIELD_COLUMN_MATCHER {
1197 self.ctx
1198 .field_column_matcher
1199 .get_or_insert_default()
1200 .push(matcher.clone());
1201 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1202 ensure!(
1203 matcher.op == MatchOp::Equal,
1204 UnsupportedMatcherOpSnafu {
1205 matcher: matcher.name.clone(),
1206 matcher_op: matcher.op.to_string(),
1207 }
1208 );
1209 self.ctx.schema_name = Some(matcher.value.clone());
1210 } else if matcher.name != METRIC_NAME {
1211 self.ctx.selector_matcher.push(matcher.clone());
1212 let _ = matchers.insert(matcher.clone());
1213 }
1214 }
1215
1216 Ok(Matchers::new(matchers.into_iter().collect()))
1217 }
1218
1219 async fn selector_to_series_normalize_plan(
1220 &mut self,
1221 offset: &Option<Offset>,
1222 label_matchers: Matchers,
1223 is_range_selector: bool,
1224 ) -> Result<LogicalPlan> {
1225 let table_ref = self.table_ref()?;
1227 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1228 let table_schema = table_scan.schema();
1229
1230 let offset_duration = match offset {
1232 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1233 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1234 None => 0,
1235 };
1236 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1237 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1238 scan_filters.push(time_index_filter);
1239 }
1240 table_scan = LogicalPlanBuilder::from(table_scan)
1241 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1243 .build()
1244 .context(DataFusionPlanningSnafu)?;
1245
1246 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1248 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1249 let mut result_set = HashSet::new();
1251 let mut reverse_set = HashSet::new();
1253 for matcher in field_matchers {
1254 match &matcher.op {
1255 MatchOp::Equal => {
1256 if col_set.contains(&matcher.value) {
1257 let _ = result_set.insert(matcher.value.clone());
1258 } else {
1259 return Err(ColumnNotFoundSnafu {
1260 col: matcher.value.clone(),
1261 }
1262 .build());
1263 }
1264 }
1265 MatchOp::NotEqual => {
1266 if col_set.contains(&matcher.value) {
1267 let _ = reverse_set.insert(matcher.value.clone());
1268 } else {
1269 return Err(ColumnNotFoundSnafu {
1270 col: matcher.value.clone(),
1271 }
1272 .build());
1273 }
1274 }
1275 MatchOp::Re(regex) => {
1276 for col in &self.ctx.field_columns {
1277 if regex.is_match(col) {
1278 let _ = result_set.insert(col.clone());
1279 }
1280 }
1281 }
1282 MatchOp::NotRe(regex) => {
1283 for col in &self.ctx.field_columns {
1284 if regex.is_match(col) {
1285 let _ = reverse_set.insert(col.clone());
1286 }
1287 }
1288 }
1289 }
1290 }
1291 if result_set.is_empty() {
1293 result_set = col_set.into_iter().cloned().collect();
1294 }
1295 for col in reverse_set {
1296 let _ = result_set.remove(&col);
1297 }
1298
1299 self.ctx.field_columns = self
1301 .ctx
1302 .field_columns
1303 .drain(..)
1304 .filter(|col| result_set.contains(col))
1305 .collect();
1306
1307 let exprs = result_set
1308 .into_iter()
1309 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1310 .chain(self.create_tag_column_exprs()?)
1311 .chain(
1312 self.ctx
1313 .use_tsid
1314 .then_some(DfExpr::Column(Column::new_unqualified(
1315 DATA_SCHEMA_TSID_COLUMN_NAME,
1316 )))
1317 .into_iter(),
1318 )
1319 .chain(Some(self.create_time_index_column_expr()?))
1320 .collect::<Vec<_>>();
1321
1322 table_scan = LogicalPlanBuilder::from(table_scan)
1324 .project(exprs)
1325 .context(DataFusionPlanningSnafu)?
1326 .build()
1327 .context(DataFusionPlanningSnafu)?;
1328 }
1329
1330 let series_key_columns = if self.ctx.use_tsid {
1332 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1333 } else {
1334 self.ctx.tag_columns.clone()
1335 };
1336
1337 let sort_exprs = if self.ctx.use_tsid {
1338 vec![
1339 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1340 self.create_time_index_column_expr()?.sort(true, true),
1341 ]
1342 } else {
1343 self.create_tag_and_time_index_column_sort_exprs()?
1344 };
1345
1346 let sort_plan = LogicalPlanBuilder::from(table_scan)
1347 .sort(sort_exprs)
1348 .context(DataFusionPlanningSnafu)?
1349 .build()
1350 .context(DataFusionPlanningSnafu)?;
1351
1352 let time_index_column =
1354 self.ctx
1355 .time_index_column
1356 .clone()
1357 .with_context(|| TimeIndexNotFoundSnafu {
1358 table: table_ref.to_string(),
1359 })?;
1360 let divide_plan = LogicalPlan::Extension(Extension {
1361 node: Arc::new(SeriesDivide::new(
1362 series_key_columns.clone(),
1363 time_index_column,
1364 sort_plan,
1365 )),
1366 });
1367
1368 if !is_range_selector && offset_duration == 0 {
1370 return Ok(divide_plan);
1371 }
1372 let series_normalize = SeriesNormalize::new(
1373 offset_duration,
1374 self.ctx
1375 .time_index_column
1376 .clone()
1377 .with_context(|| TimeIndexNotFoundSnafu {
1378 table: table_ref.to_quoted_string(),
1379 })?,
1380 is_range_selector,
1381 series_key_columns,
1382 divide_plan,
1383 );
1384 let logical_plan = LogicalPlan::Extension(Extension {
1385 node: Arc::new(series_normalize),
1386 });
1387
1388 Ok(logical_plan)
1389 }
1390
1391 fn agg_modifier_to_col(
1398 &mut self,
1399 input_schema: &DFSchemaRef,
1400 modifier: &Option<LabelModifier>,
1401 update_ctx: bool,
1402 ) -> Result<Vec<DfExpr>> {
1403 match modifier {
1404 None => {
1405 if update_ctx {
1406 self.ctx.tag_columns.clear();
1407 }
1408 Ok(vec![self.create_time_index_column_expr()?])
1409 }
1410 Some(LabelModifier::Include(labels)) => {
1411 if update_ctx {
1412 self.ctx.tag_columns.clear();
1413 }
1414 let mut exprs = Vec::with_capacity(labels.labels.len());
1415 for label in &labels.labels {
1416 if is_metric_engine_internal_column(label) {
1417 continue;
1418 }
1419 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1421 {
1422 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1423
1424 if update_ctx {
1425 self.ctx.tag_columns.push(column_name);
1427 }
1428 }
1429 }
1430 exprs.push(self.create_time_index_column_expr()?);
1432
1433 Ok(exprs)
1434 }
1435 Some(LabelModifier::Exclude(labels)) => {
1436 let mut all_fields = input_schema
1437 .fields()
1438 .iter()
1439 .map(|f| f.name())
1440 .collect::<BTreeSet<_>>();
1441
1442 all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1445
1446 for label in &labels.labels {
1449 let _ = all_fields.remove(label);
1450 }
1451
1452 if let Some(time_index) = &self.ctx.time_index_column {
1454 let _ = all_fields.remove(time_index);
1455 }
1456 for value in &self.ctx.field_columns {
1457 let _ = all_fields.remove(value);
1458 }
1459
1460 if update_ctx {
1461 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1463 }
1464
1465 let mut exprs = all_fields
1467 .into_iter()
1468 .map(|c| DfExpr::Column(Column::from(c)))
1469 .collect::<Vec<_>>();
1470
1471 exprs.push(self.create_time_index_column_expr()?);
1473
1474 Ok(exprs)
1475 }
1476 }
1477 }
1478
1479 pub fn matchers_to_expr(
1481 label_matchers: Matchers,
1482 table_schema: &DFSchemaRef,
1483 ) -> Result<Vec<DfExpr>> {
1484 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1485 for matcher in label_matchers.matchers {
1486 if matcher.name == SCHEMA_COLUMN_MATCHER
1487 || matcher.name == DB_COLUMN_MATCHER
1488 || matcher.name == FIELD_COLUMN_MATCHER
1489 {
1490 continue;
1491 }
1492
1493 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1494 let col = if let Some(column_name) = column_name {
1495 DfExpr::Column(Column::from_name(column_name))
1496 } else {
1497 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1498 .alias(matcher.name.clone())
1499 };
1500 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1501 let expr = match matcher.op {
1502 MatchOp::Equal => col.eq(lit),
1503 MatchOp::NotEqual => col.not_eq(lit),
1504 MatchOp::Re(re) => {
1505 if re.as_str() == "^(?:.*)$" {
1511 continue;
1512 }
1513 if re.as_str() == "^(?:.+)$" {
1514 col.not_eq(DfExpr::Literal(
1515 ScalarValue::Utf8(Some(String::new())),
1516 None,
1517 ))
1518 } else {
1519 DfExpr::BinaryExpr(BinaryExpr {
1520 left: Box::new(col),
1521 op: Operator::RegexMatch,
1522 right: Box::new(DfExpr::Literal(
1523 ScalarValue::Utf8(Some(re.as_str().to_string())),
1524 None,
1525 )),
1526 })
1527 }
1528 }
1529 MatchOp::NotRe(re) => {
1530 if re.as_str() == "^(?:.*)$" {
1531 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1532 } else if re.as_str() == "^(?:.+)$" {
1533 col.eq(DfExpr::Literal(
1534 ScalarValue::Utf8(Some(String::new())),
1535 None,
1536 ))
1537 } else {
1538 DfExpr::BinaryExpr(BinaryExpr {
1539 left: Box::new(col),
1540 op: Operator::RegexNotMatch,
1541 right: Box::new(DfExpr::Literal(
1542 ScalarValue::Utf8(Some(re.as_str().to_string())),
1543 None,
1544 )),
1545 })
1546 }
1547 }
1548 };
1549 exprs.push(expr);
1550 }
1551
1552 Ok(exprs)
1553 }
1554
1555 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1556 if is_metric_engine_internal_column(column) {
1557 return None;
1558 }
1559 schema
1560 .fields()
1561 .iter()
1562 .find(|field| field.name() == column)
1563 .map(|field| field.name().clone())
1564 }
1565
1566 fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1567 Ok(source
1568 .as_any()
1569 .downcast_ref::<DefaultTableSource>()
1570 .context(UnknownTableSnafu)?
1571 .table_provider
1572 .as_any()
1573 .downcast_ref::<DfTableProviderAdapter>()
1574 .context(UnknownTableSnafu)?
1575 .table())
1576 }
1577
1578 fn table_ref(&self) -> Result<TableReference> {
1579 let table_name = self
1580 .ctx
1581 .table_name
1582 .clone()
1583 .context(TableNameNotFoundSnafu)?;
1584
1585 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1587 TableReference::partial(schema_name.as_str(), table_name.as_str())
1588 } else {
1589 TableReference::bare(table_name.as_str())
1590 };
1591
1592 Ok(table_ref)
1593 }
1594
1595 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1596 let start = self.ctx.start;
1597 let end = self.ctx.end;
1598 if end < start {
1599 return InvalidTimeRangeSnafu { start, end }.fail();
1600 }
1601 let lookback_delta = self.ctx.lookback_delta;
1602 let range = self.ctx.range.unwrap_or_default();
1603 let interval = self.ctx.interval;
1604 let time_index_expr = self.create_time_index_column_expr()?;
1605 let num_points = (end - start) / interval;
1606
1607 let selector_window = if range == 0 { lookback_delta } else { range };
1615 let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1616
1617 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1619 let single_time_range = time_index_expr
1620 .clone()
1621 .gt_eq(DfExpr::Literal(
1622 ScalarValue::TimestampMillisecond(
1623 Some(
1624 self.ctx.start - offset_duration - selector_window
1625 + lower_exclusive_adjustment,
1626 ),
1627 None,
1628 ),
1629 None,
1630 ))
1631 .and(time_index_expr.lt_eq(DfExpr::Literal(
1632 ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1633 None,
1634 )));
1635 return Ok(Some(single_time_range));
1636 }
1637
1638 let mut filters = Vec::with_capacity(num_points as usize + 1);
1640 for timestamp in (start..=end).step_by(interval as usize) {
1641 filters.push(
1642 time_index_expr
1643 .clone()
1644 .gt_eq(DfExpr::Literal(
1645 ScalarValue::TimestampMillisecond(
1646 Some(
1647 timestamp - offset_duration - selector_window
1648 + lower_exclusive_adjustment,
1649 ),
1650 None,
1651 ),
1652 None,
1653 ))
1654 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1655 ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1656 None,
1657 ))),
1658 )
1659 }
1660
1661 Ok(filters.into_iter().reduce(DfExpr::or))
1662 }
1663
1664 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1669 let provider = self
1670 .table_provider
1671 .resolve_table(table_ref.clone())
1672 .await
1673 .context(CatalogSnafu)?;
1674
1675 let logical_table = self.table_from_source(&provider)?;
1676
1677 let mut maybe_phy_table_ref = table_ref.clone();
1679 let mut scan_provider = provider;
1680 let mut table_id_filter: Option<u32> = None;
1681
1682 if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1685 && let Some(physical_table_name) = logical_table
1686 .table_info()
1687 .meta
1688 .options
1689 .extra_options
1690 .get(LOGICAL_TABLE_METADATA_KEY)
1691 {
1692 let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1693 TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1694 } else {
1695 TableReference::bare(physical_table_name.as_str())
1696 };
1697
1698 let physical_provider = match self
1699 .table_provider
1700 .resolve_table(physical_table_ref.clone())
1701 .await
1702 {
1703 Ok(provider) => provider,
1704 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1705 scan_provider.clone()
1708 }
1709 Err(e) => return Err(e).context(CatalogSnafu),
1710 };
1711
1712 if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1713 let physical_table = self.table_from_source(&physical_provider)?;
1715
1716 let has_table_id = physical_table
1717 .schema()
1718 .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1719 .is_some();
1720 let has_tsid = physical_table
1721 .schema()
1722 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1723 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1724
1725 if has_table_id && has_tsid {
1726 scan_provider = physical_provider;
1727 maybe_phy_table_ref = physical_table_ref;
1728 table_id_filter = Some(logical_table.table_info().ident.table_id);
1729 }
1730 }
1731 }
1732
1733 let scan_table = self.table_from_source(&scan_provider)?;
1734
1735 let use_tsid = table_id_filter.is_some()
1736 && scan_table
1737 .schema()
1738 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1739 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1740 self.ctx.use_tsid = use_tsid;
1741
1742 let all_table_tags = self.ctx.tag_columns.clone();
1743
1744 let scan_tag_columns = if use_tsid {
1745 let mut scan_tags = self.ctx.tag_columns.clone();
1746 for matcher in &self.ctx.selector_matcher {
1747 if is_metric_engine_internal_column(&matcher.name) {
1748 continue;
1749 }
1750 if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1751 scan_tags.push(matcher.name.clone());
1752 }
1753 }
1754 scan_tags.sort_unstable();
1755 scan_tags.dedup();
1756 scan_tags
1757 } else {
1758 self.ctx.tag_columns.clone()
1759 };
1760
1761 let is_time_index_ms = scan_table
1762 .schema()
1763 .timestamp_column()
1764 .with_context(|| TimeIndexNotFoundSnafu {
1765 table: maybe_phy_table_ref.to_quoted_string(),
1766 })?
1767 .data_type
1768 == ConcreteDataType::timestamp_millisecond_datatype();
1769
1770 let scan_projection = if table_id_filter.is_some() {
1771 let mut required_columns = HashSet::new();
1772 required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1773 required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1774 TimeIndexNotFoundSnafu {
1775 table: maybe_phy_table_ref.to_quoted_string(),
1776 }
1777 })?);
1778 for col in &scan_tag_columns {
1779 required_columns.insert(col.clone());
1780 }
1781 for col in &self.ctx.field_columns {
1782 required_columns.insert(col.clone());
1783 }
1784 if use_tsid {
1785 required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1786 }
1787
1788 let arrow_schema = scan_table.schema().arrow_schema().clone();
1789 Some(
1790 arrow_schema
1791 .fields()
1792 .iter()
1793 .enumerate()
1794 .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1795 .map(|(idx, _)| idx)
1796 .collect::<Vec<_>>(),
1797 )
1798 } else {
1799 None
1800 };
1801
1802 let mut scan_plan =
1803 LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1804 .context(DataFusionPlanningSnafu)?
1805 .build()
1806 .context(DataFusionPlanningSnafu)?;
1807
1808 if let Some(table_id) = table_id_filter {
1809 scan_plan = LogicalPlanBuilder::from(scan_plan)
1810 .filter(
1811 DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1812 .eq(lit(table_id)),
1813 )
1814 .context(DataFusionPlanningSnafu)?
1815 .alias(table_ref.clone()) .context(DataFusionPlanningSnafu)?
1817 .build()
1818 .context(DataFusionPlanningSnafu)?;
1819 }
1820
1821 if !is_time_index_ms {
1822 let expr: Vec<_> = self
1824 .create_field_column_exprs()?
1825 .into_iter()
1826 .chain(
1827 scan_tag_columns
1828 .iter()
1829 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1830 )
1831 .chain(
1832 self.ctx
1833 .use_tsid
1834 .then_some(DfExpr::Column(Column::new(
1835 Some(table_ref.clone()),
1836 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1837 )))
1838 .into_iter(),
1839 )
1840 .chain(Some(DfExpr::Alias(Alias {
1841 expr: Box::new(DfExpr::Cast(Cast {
1842 expr: Box::new(self.create_time_index_column_expr()?),
1843 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1844 })),
1845 relation: Some(table_ref.clone()),
1846 name: self
1847 .ctx
1848 .time_index_column
1849 .as_ref()
1850 .with_context(|| TimeIndexNotFoundSnafu {
1851 table: table_ref.to_quoted_string(),
1852 })?
1853 .clone(),
1854 metadata: None,
1855 })))
1856 .collect::<Vec<_>>();
1857 scan_plan = LogicalPlanBuilder::from(scan_plan)
1858 .project(expr)
1859 .context(DataFusionPlanningSnafu)?
1860 .build()
1861 .context(DataFusionPlanningSnafu)?;
1862 } else if table_id_filter.is_some() {
1863 let project_exprs = self
1865 .create_field_column_exprs()?
1866 .into_iter()
1867 .chain(
1868 scan_tag_columns
1869 .iter()
1870 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1871 )
1872 .chain(
1873 self.ctx
1874 .use_tsid
1875 .then_some(DfExpr::Column(Column::from_name(
1876 DATA_SCHEMA_TSID_COLUMN_NAME,
1877 )))
1878 .into_iter(),
1879 )
1880 .chain(Some(self.create_time_index_column_expr()?))
1881 .collect::<Vec<_>>();
1882
1883 scan_plan = LogicalPlanBuilder::from(scan_plan)
1884 .project(project_exprs)
1885 .context(DataFusionPlanningSnafu)?
1886 .build()
1887 .context(DataFusionPlanningSnafu)?;
1888 }
1889
1890 let result = LogicalPlanBuilder::from(scan_plan)
1891 .build()
1892 .context(DataFusionPlanningSnafu)?;
1893 Ok(result)
1894 }
1895
1896 fn collect_row_key_tag_columns_from_plan(
1897 &self,
1898 plan: &LogicalPlan,
1899 ) -> Result<BTreeSet<String>> {
1900 fn walk(
1901 planner: &PromPlanner,
1902 plan: &LogicalPlan,
1903 out: &mut BTreeSet<String>,
1904 ) -> Result<()> {
1905 if let LogicalPlan::TableScan(scan) = plan {
1906 let table = planner.table_from_source(&scan.source)?;
1907 for col in table.table_info().meta.row_key_column_names() {
1908 if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1909 && col != DATA_SCHEMA_TSID_COLUMN_NAME
1910 && !is_metric_engine_internal_column(col)
1911 {
1912 out.insert(col.clone());
1913 }
1914 }
1915 }
1916
1917 for input in plan.inputs() {
1918 walk(planner, input, out)?;
1919 }
1920 Ok(())
1921 }
1922
1923 let mut out = BTreeSet::new();
1924 walk(self, plan, &mut out)?;
1925 Ok(out)
1926 }
1927
1928 fn ensure_tag_columns_available(
1929 &self,
1930 plan: LogicalPlan,
1931 required_tags: &BTreeSet<String>,
1932 ) -> Result<LogicalPlan> {
1933 if required_tags.is_empty() {
1934 return Ok(plan);
1935 }
1936
1937 struct Rewriter {
1938 required_tags: BTreeSet<String>,
1939 }
1940
1941 impl TreeNodeRewriter for Rewriter {
1942 type Node = LogicalPlan;
1943
1944 fn f_up(
1945 &mut self,
1946 node: Self::Node,
1947 ) -> datafusion_common::Result<Transformed<Self::Node>> {
1948 match node {
1949 LogicalPlan::TableScan(scan) => {
1950 let schema = scan.source.schema();
1951 let mut projection = match scan.projection.clone() {
1952 Some(p) => p,
1953 None => {
1954 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1956 }
1957 };
1958
1959 let mut changed = false;
1960 for tag in &self.required_tags {
1961 if let Some((idx, _)) = schema
1962 .fields()
1963 .iter()
1964 .enumerate()
1965 .find(|(_, field)| field.name() == tag)
1966 && !projection.contains(&idx)
1967 {
1968 projection.push(idx);
1969 changed = true;
1970 }
1971 }
1972
1973 if !changed {
1974 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1975 }
1976
1977 projection.sort_unstable();
1978 projection.dedup();
1979
1980 let new_scan = TableScan::try_new(
1981 scan.table_name.clone(),
1982 scan.source.clone(),
1983 Some(projection),
1984 scan.filters,
1985 scan.fetch,
1986 )?;
1987 Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1988 }
1989 LogicalPlan::Projection(proj) => {
1990 let input_schema = proj.input.schema();
1991
1992 let existing = proj
1993 .schema
1994 .fields()
1995 .iter()
1996 .map(|f| f.name().as_str())
1997 .collect::<HashSet<_>>();
1998
1999 let mut expr = proj.expr.clone();
2000 let mut has_changed = false;
2001 for tag in &self.required_tags {
2002 if existing.contains(tag.as_str()) {
2003 continue;
2004 }
2005
2006 if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
2007 expr.push(DfExpr::Column(Column::from(
2008 input_schema.qualified_field(idx),
2009 )));
2010 has_changed = true;
2011 }
2012 }
2013
2014 if !has_changed {
2015 return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2016 }
2017
2018 let new_proj = Projection::try_new(expr, proj.input)?;
2019 Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2020 }
2021 other => Ok(Transformed::no(other)),
2022 }
2023 }
2024 }
2025
2026 let mut rewriter = Rewriter {
2027 required_tags: required_tags.clone(),
2028 };
2029 let rewritten = plan
2030 .rewrite(&mut rewriter)
2031 .context(DataFusionPlanningSnafu)?;
2032 Ok(rewritten.data)
2033 }
2034
2035 fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2036 let time_index = self.ctx.time_index_column.as_deref();
2037 let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2038
2039 let mut tags = schema
2040 .fields()
2041 .iter()
2042 .map(|f| f.name())
2043 .filter(|name| Some(name.as_str()) != time_index)
2044 .filter(|name| !field_columns.contains(name))
2045 .filter(|name| !is_metric_engine_internal_column(name))
2046 .cloned()
2047 .collect::<Vec<_>>();
2048 tags.sort_unstable();
2049 tags.dedup();
2050 self.ctx.tag_columns = tags;
2051 }
2052
2053 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2057 let table_ref = self.table_ref()?;
2058 let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2059 Err(e) if e.status_code() == StatusCode::TableNotFound => {
2060 let plan = self.setup_context_for_empty_metric()?;
2061 return Ok(Some(plan));
2062 }
2063 res => res.context(CatalogSnafu)?,
2064 };
2065 let table = self.table_from_source(&source)?;
2066
2067 let time_index = table
2069 .schema()
2070 .timestamp_column()
2071 .with_context(|| TimeIndexNotFoundSnafu {
2072 table: table_ref.to_quoted_string(),
2073 })?
2074 .name
2075 .clone();
2076 self.ctx.time_index_column = Some(time_index);
2077
2078 let values = table
2080 .table_info()
2081 .meta
2082 .field_column_names()
2083 .cloned()
2084 .collect();
2085 self.ctx.field_columns = values;
2086
2087 let tags = table
2089 .table_info()
2090 .meta
2091 .row_key_column_names()
2092 .filter(|col| {
2093 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2095 })
2096 .cloned()
2097 .collect();
2098 self.ctx.tag_columns = tags;
2099
2100 self.ctx.use_tsid = false;
2101
2102 Ok(None)
2103 }
2104
2105 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2108 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2109 self.ctx.reset_table_name_and_schema();
2110 self.ctx.tag_columns = vec![];
2111 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2112 self.ctx.use_tsid = false;
2113
2114 let plan = LogicalPlan::Extension(Extension {
2116 node: Arc::new(
2117 EmptyMetric::new(
2118 0,
2119 -1,
2120 self.ctx.interval,
2121 SPECIAL_TIME_FUNCTION.to_string(),
2122 DEFAULT_FIELD_COLUMN.to_string(),
2123 Some(lit(0.0f64)),
2124 )
2125 .context(DataFusionPlanningSnafu)?,
2126 ),
2127 });
2128 Ok(plan)
2129 }
2130
2131 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2133 let mut result = FunctionArgs::default();
2134
2135 for arg in args {
2136 if let Some(expr) = Self::try_build_literal_expr(arg) {
2138 result.literals.push(expr);
2139 } else {
2140 match arg.as_ref() {
2142 PromExpr::Subquery(_)
2143 | PromExpr::VectorSelector(_)
2144 | PromExpr::MatrixSelector(_)
2145 | PromExpr::Extension(_)
2146 | PromExpr::Aggregate(_)
2147 | PromExpr::Paren(_)
2148 | PromExpr::Call(_)
2149 | PromExpr::Binary(_)
2150 | PromExpr::Unary(_) => {
2151 if result.input.replace(*arg.clone()).is_some() {
2152 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2153 }
2154 }
2155
2156 _ => {
2157 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2158 result.literals.push(expr);
2159 }
2160 }
2161 }
2162 }
2163
2164 Ok(result)
2165 }
2166
2167 fn create_function_expr(
2173 &mut self,
2174 func: &Function,
2175 other_input_exprs: Vec<DfExpr>,
2176 query_engine_state: &QueryEngineState,
2177 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2178 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2180
2181 let field_column_pos = 0;
2183 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2184 let mut new_tags = vec![];
2186 let scalar_func = match func.name {
2187 "increase" => ScalarFunc::ExtrapolateUdf(
2188 Arc::new(Increase::scalar_udf()),
2189 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2190 ),
2191 "rate" => ScalarFunc::ExtrapolateUdf(
2192 Arc::new(Rate::scalar_udf()),
2193 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2194 ),
2195 "delta" => ScalarFunc::ExtrapolateUdf(
2196 Arc::new(Delta::scalar_udf()),
2197 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2198 ),
2199 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2200 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2201 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2202 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2203 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2204 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2205 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2206 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2207 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2208 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2209 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2210 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2211 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2212 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2213 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2214 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2215 "predict_linear" => {
2216 other_input_exprs[0] = DfExpr::Cast(Cast {
2217 expr: Box::new(other_input_exprs[0].clone()),
2218 data_type: ArrowDataType::Int64,
2219 });
2220 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2221 }
2222 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
2223 "time" => {
2224 exprs.push(build_special_time_expr(
2225 self.ctx.time_index_column.as_ref().unwrap(),
2226 ));
2227 ScalarFunc::GeneratedExpr
2228 }
2229 "minute" => {
2230 let expr = self.date_part_on_time_index("minute")?;
2232 exprs.push(expr);
2233 ScalarFunc::GeneratedExpr
2234 }
2235 "hour" => {
2236 let expr = self.date_part_on_time_index("hour")?;
2238 exprs.push(expr);
2239 ScalarFunc::GeneratedExpr
2240 }
2241 "month" => {
2242 let expr = self.date_part_on_time_index("month")?;
2244 exprs.push(expr);
2245 ScalarFunc::GeneratedExpr
2246 }
2247 "year" => {
2248 let expr = self.date_part_on_time_index("year")?;
2250 exprs.push(expr);
2251 ScalarFunc::GeneratedExpr
2252 }
2253 "day_of_month" => {
2254 let expr = self.date_part_on_time_index("day")?;
2256 exprs.push(expr);
2257 ScalarFunc::GeneratedExpr
2258 }
2259 "day_of_week" => {
2260 let expr = self.date_part_on_time_index("dow")?;
2262 exprs.push(expr);
2263 ScalarFunc::GeneratedExpr
2264 }
2265 "day_of_year" => {
2266 let expr = self.date_part_on_time_index("doy")?;
2268 exprs.push(expr);
2269 ScalarFunc::GeneratedExpr
2270 }
2271 "days_in_month" => {
2272 let day_lit_expr = "day".lit();
2277 let month_lit_expr = "month".lit();
2278 let interval_1month_lit_expr =
2279 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2280 let interval_1day_lit_expr = DfExpr::Literal(
2281 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2282 None,
2283 );
2284 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2285 left: Box::new(interval_1month_lit_expr),
2286 op: Operator::Minus,
2287 right: Box::new(interval_1day_lit_expr),
2288 });
2289 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2290 func: datafusion_functions::datetime::date_trunc(),
2291 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2292 });
2293 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2294 left: Box::new(date_trunc_expr),
2295 op: Operator::Plus,
2296 right: Box::new(the_1month_minus_1day_expr),
2297 });
2298 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2299 func: datafusion_functions::datetime::date_part(),
2300 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2301 });
2302
2303 exprs.push(date_part_expr);
2304 ScalarFunc::GeneratedExpr
2305 }
2306
2307 "label_join" => {
2308 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2309 &mut other_input_exprs,
2310 &self.ctx,
2311 query_engine_state,
2312 )?;
2313
2314 for value in &self.ctx.field_columns {
2316 if *value != dst_label {
2317 let expr = DfExpr::Column(Column::from_name(value));
2318 exprs.push(expr);
2319 }
2320 }
2321
2322 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2324 new_tags.push(dst_label);
2325 exprs.push(concat_expr);
2327
2328 ScalarFunc::GeneratedExpr
2329 }
2330 "label_replace" => {
2331 if let Some((replace_expr, dst_label)) = self
2332 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2333 {
2334 for value in &self.ctx.field_columns {
2336 if *value != dst_label {
2337 let expr = DfExpr::Column(Column::from_name(value));
2338 exprs.push(expr);
2339 }
2340 }
2341
2342 ensure!(
2343 !self.ctx.tag_columns.contains(&dst_label),
2344 SameLabelSetSnafu
2345 );
2346 new_tags.push(dst_label);
2347 exprs.push(replace_expr);
2349 } else {
2350 for value in &self.ctx.field_columns {
2352 let expr = DfExpr::Column(Column::from_name(value));
2353 exprs.push(expr);
2354 }
2355 }
2356
2357 ScalarFunc::GeneratedExpr
2358 }
2359 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2360 for value in &self.ctx.field_columns {
2363 let expr = DfExpr::Column(Column::from_name(value));
2364 exprs.push(expr);
2365 }
2366
2367 ScalarFunc::GeneratedExpr
2368 }
2369 "round" => {
2370 if other_input_exprs.is_empty() {
2371 other_input_exprs.push_front(0.0f64.lit());
2372 }
2373 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2374 }
2375 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2376 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2377 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2378 "pi" => {
2379 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2381 func: datafusion::functions::math::pi(),
2382 args: vec![],
2383 });
2384 exprs.push(fn_expr);
2385
2386 ScalarFunc::GeneratedExpr
2387 }
2388 _ => {
2389 if let Some(f) = query_engine_state
2390 .session_state()
2391 .scalar_functions()
2392 .get(func.name)
2393 {
2394 ScalarFunc::DataFusionBuiltin(f.clone())
2395 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2396 let func_state = query_engine_state.function_state();
2397 let query_ctx = self.table_provider.query_ctx();
2398
2399 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2400 state: func_state,
2401 query_ctx: query_ctx.clone(),
2402 })))
2403 } else if let Some(f) = datafusion_functions::math::functions()
2404 .iter()
2405 .find(|f| f.name() == func.name)
2406 {
2407 ScalarFunc::DataFusionUdf(f.clone())
2408 } else {
2409 return UnsupportedExprSnafu {
2410 name: func.name.to_string(),
2411 }
2412 .fail();
2413 }
2414 }
2415 };
2416
2417 for value in &self.ctx.field_columns {
2418 let col_expr = DfExpr::Column(Column::from_name(value));
2419
2420 match scalar_func.clone() {
2421 ScalarFunc::DataFusionBuiltin(func) => {
2422 other_input_exprs.insert(field_column_pos, col_expr);
2423 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2424 func,
2425 args: other_input_exprs.clone().into(),
2426 });
2427 exprs.push(fn_expr);
2428 let _ = other_input_exprs.remove(field_column_pos);
2429 }
2430 ScalarFunc::DataFusionUdf(func) => {
2431 let args = itertools::chain!(
2432 other_input_exprs.iter().take(field_column_pos).cloned(),
2433 std::iter::once(col_expr),
2434 other_input_exprs.iter().skip(field_column_pos).cloned()
2435 )
2436 .collect_vec();
2437 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2438 }
2439 ScalarFunc::Udf(func) => {
2440 let ts_range_expr = DfExpr::Column(Column::from_name(
2441 RangeManipulate::build_timestamp_range_name(
2442 self.ctx.time_index_column.as_ref().unwrap(),
2443 ),
2444 ));
2445 other_input_exprs.insert(field_column_pos, ts_range_expr);
2446 other_input_exprs.insert(field_column_pos + 1, col_expr);
2447 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2448 func,
2449 args: other_input_exprs.clone().into(),
2450 });
2451 exprs.push(fn_expr);
2452 let _ = other_input_exprs.remove(field_column_pos + 1);
2453 let _ = other_input_exprs.remove(field_column_pos);
2454 }
2455 ScalarFunc::ExtrapolateUdf(func, range_length) => {
2456 let ts_range_expr = DfExpr::Column(Column::from_name(
2457 RangeManipulate::build_timestamp_range_name(
2458 self.ctx.time_index_column.as_ref().unwrap(),
2459 ),
2460 ));
2461 other_input_exprs.insert(field_column_pos, ts_range_expr);
2462 other_input_exprs.insert(field_column_pos + 1, col_expr);
2463 other_input_exprs
2464 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2465 other_input_exprs.push_back(lit(range_length));
2466 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2467 func,
2468 args: other_input_exprs.clone().into(),
2469 });
2470 exprs.push(fn_expr);
2471 let _ = other_input_exprs.pop_back();
2472 let _ = other_input_exprs.remove(field_column_pos + 2);
2473 let _ = other_input_exprs.remove(field_column_pos + 1);
2474 let _ = other_input_exprs.remove(field_column_pos);
2475 }
2476 ScalarFunc::GeneratedExpr => {}
2477 }
2478 }
2479
2480 if !matches!(func.name, "label_join" | "label_replace") {
2484 let mut new_field_columns = Vec::with_capacity(exprs.len());
2485
2486 exprs = exprs
2487 .into_iter()
2488 .map(|expr| {
2489 let display_name = expr.schema_name().to_string();
2490 new_field_columns.push(display_name.clone());
2491 Ok(expr.alias(display_name))
2492 })
2493 .collect::<std::result::Result<Vec<_>, _>>()
2494 .context(DataFusionPlanningSnafu)?;
2495
2496 self.ctx.field_columns = new_field_columns;
2497 }
2498
2499 Ok((exprs, new_tags))
2500 }
2501
2502 fn validate_label_name(label_name: &str) -> Result<()> {
2506 if label_name.starts_with("__") {
2508 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2509 }
2510 if !LABEL_NAME_REGEX.is_match(label_name) {
2512 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2513 }
2514
2515 Ok(())
2516 }
2517
2518 fn build_regexp_replace_label_expr(
2520 &self,
2521 other_input_exprs: &mut VecDeque<DfExpr>,
2522 query_engine_state: &QueryEngineState,
2523 ) -> Result<Option<(DfExpr, String)>> {
2524 let dst_label = match other_input_exprs.pop_front() {
2526 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2527 other => UnexpectedPlanExprSnafu {
2528 desc: format!("expected dst_label string literal, but found {:?}", other),
2529 }
2530 .fail()?,
2531 };
2532
2533 Self::validate_label_name(&dst_label)?;
2535 let replacement = match other_input_exprs.pop_front() {
2536 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2537 other => UnexpectedPlanExprSnafu {
2538 desc: format!("expected replacement string literal, but found {:?}", other),
2539 }
2540 .fail()?,
2541 };
2542 let src_label = match other_input_exprs.pop_front() {
2543 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2544 other => UnexpectedPlanExprSnafu {
2545 desc: format!("expected src_label string literal, but found {:?}", other),
2546 }
2547 .fail()?,
2548 };
2549
2550 let regex = match other_input_exprs.pop_front() {
2551 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2552 other => UnexpectedPlanExprSnafu {
2553 desc: format!("expected regex string literal, but found {:?}", other),
2554 }
2555 .fail()?,
2556 };
2557
2558 regex::Regex::new(®ex).map_err(|_| {
2561 InvalidRegularExpressionSnafu {
2562 regex: regex.clone(),
2563 }
2564 .build()
2565 })?;
2566
2567 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2569 return Ok(None);
2570 }
2571
2572 if !self.ctx.tag_columns.contains(&src_label) {
2574 if replacement.is_empty() {
2575 return Ok(None);
2577 } else {
2578 return Ok(Some((
2580 lit(replacement).alias(&dst_label),
2582 dst_label,
2583 )));
2584 }
2585 }
2586
2587 let regex = format!("^(?s:{regex})$");
2590
2591 let session_state = query_engine_state.session_state();
2592 let func = session_state
2593 .scalar_functions()
2594 .get("regexp_replace")
2595 .context(UnsupportedExprSnafu {
2596 name: "regexp_replace",
2597 })?;
2598
2599 let args = vec![
2601 if src_label.is_empty() {
2602 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2603 } else {
2604 DfExpr::Column(Column::from_name(src_label))
2605 },
2606 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2607 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2608 ];
2609
2610 Ok(Some((
2611 DfExpr::ScalarFunction(ScalarFunction {
2612 func: func.clone(),
2613 args,
2614 })
2615 .alias(&dst_label),
2616 dst_label,
2617 )))
2618 }
2619
2620 fn build_concat_labels_expr(
2622 other_input_exprs: &mut VecDeque<DfExpr>,
2623 ctx: &PromPlannerContext,
2624 query_engine_state: &QueryEngineState,
2625 ) -> Result<(DfExpr, String)> {
2626 let dst_label = match other_input_exprs.pop_front() {
2629 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2630 other => UnexpectedPlanExprSnafu {
2631 desc: format!("expected dst_label string literal, but found {:?}", other),
2632 }
2633 .fail()?,
2634 };
2635 let separator = match other_input_exprs.pop_front() {
2636 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2637 other => UnexpectedPlanExprSnafu {
2638 desc: format!("expected separator string literal, but found {:?}", other),
2639 }
2640 .fail()?,
2641 };
2642
2643 let available_columns: HashSet<&str> = ctx
2645 .tag_columns
2646 .iter()
2647 .chain(ctx.field_columns.iter())
2648 .chain(ctx.time_index_column.as_ref())
2649 .map(|s| s.as_str())
2650 .collect();
2651
2652 let src_labels = other_input_exprs
2653 .iter()
2654 .map(|expr| {
2655 match expr {
2657 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2658 if label.is_empty() {
2659 Ok(DfExpr::Literal(ScalarValue::Null, None))
2660 } else if available_columns.contains(label.as_str()) {
2661 Ok(DfExpr::Column(Column::from_name(label)))
2663 } else {
2664 Ok(DfExpr::Literal(ScalarValue::Null, None))
2666 }
2667 }
2668 other => UnexpectedPlanExprSnafu {
2669 desc: format!(
2670 "expected source label string literal, but found {:?}",
2671 other
2672 ),
2673 }
2674 .fail(),
2675 }
2676 })
2677 .collect::<Result<Vec<_>>>()?;
2678 ensure!(
2679 !src_labels.is_empty(),
2680 FunctionInvalidArgumentSnafu {
2681 fn_name: "label_join"
2682 }
2683 );
2684
2685 let session_state = query_engine_state.session_state();
2686 let func = session_state
2687 .scalar_functions()
2688 .get("concat_ws")
2689 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2690
2691 let mut args = Vec::with_capacity(1 + src_labels.len());
2693 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2694 args.extend(src_labels);
2695
2696 Ok((
2697 DfExpr::ScalarFunction(ScalarFunction {
2698 func: func.clone(),
2699 args,
2700 })
2701 .alias(&dst_label),
2702 dst_label,
2703 ))
2704 }
2705
2706 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2707 Ok(DfExpr::Column(Column::from_name(
2708 self.ctx
2709 .time_index_column
2710 .clone()
2711 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2712 )))
2713 }
2714
2715 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2716 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2717 for tag in &self.ctx.tag_columns {
2718 let expr = DfExpr::Column(Column::from_name(tag));
2719 result.push(expr);
2720 }
2721 Ok(result)
2722 }
2723
2724 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2725 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2726 for field in &self.ctx.field_columns {
2727 let expr = DfExpr::Column(Column::from_name(field));
2728 result.push(expr);
2729 }
2730 Ok(result)
2731 }
2732
2733 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2734 let mut result = self
2735 .ctx
2736 .tag_columns
2737 .iter()
2738 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2739 .collect::<Vec<_>>();
2740 result.push(self.create_time_index_column_expr()?.sort(true, true));
2741 Ok(result)
2742 }
2743
2744 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2745 self.ctx
2746 .field_columns
2747 .iter()
2748 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2749 .collect::<Vec<_>>()
2750 }
2751
2752 fn create_sort_exprs_by_tags(
2753 func: &str,
2754 tags: Vec<DfExpr>,
2755 asc: bool,
2756 ) -> Result<Vec<SortExpr>> {
2757 ensure!(
2758 !tags.is_empty(),
2759 FunctionInvalidArgumentSnafu { fn_name: func }
2760 );
2761
2762 tags.iter()
2763 .map(|col| match col {
2764 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2765 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2766 }
2767 other => UnexpectedPlanExprSnafu {
2768 desc: format!("expected label string literal, but found {:?}", other),
2769 }
2770 .fail(),
2771 })
2772 .collect::<Result<Vec<_>>>()
2773 }
2774
2775 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2776 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2777 for value in &self.ctx.field_columns {
2778 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2779 exprs.push(expr);
2780 }
2781
2782 conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2787 table: self
2788 .table_ref()
2789 .map(|t| t.to_quoted_string())
2790 .unwrap_or_else(|_| "unknown".to_string()),
2791 })
2792 }
2793
2794 fn create_aggregate_exprs(
2810 &mut self,
2811 op: TokenType,
2812 param: &Option<Box<PromExpr>>,
2813 input_plan: &LogicalPlan,
2814 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2815 let mut non_col_args = Vec::new();
2816 let is_group_agg = op.id() == token::T_GROUP;
2817 if is_group_agg {
2818 ensure!(
2819 self.ctx.field_columns.len() == 1,
2820 MultiFieldsNotSupportedSnafu {
2821 operator: "group()"
2822 }
2823 );
2824 }
2825 let aggr = match op.id() {
2826 token::T_SUM => sum_udaf(),
2827 token::T_QUANTILE => {
2828 let q =
2829 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2830 non_col_args.push(q);
2831 quantile_udaf()
2832 }
2833 token::T_AVG => avg_udaf(),
2834 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2835 token::T_MIN => min_udaf(),
2836 token::T_MAX => max_udaf(),
2837 token::T_GROUP => max_udaf(),
2840 token::T_STDDEV => stddev_pop_udaf(),
2841 token::T_STDVAR => var_pop_udaf(),
2842 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2843 name: format!("{op:?}"),
2844 }
2845 .fail()?,
2846 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2847 };
2848
2849 let exprs: Vec<DfExpr> = self
2851 .ctx
2852 .field_columns
2853 .iter()
2854 .map(|col| {
2855 if is_group_agg {
2856 aggr.call(vec![lit(1_f64)])
2857 } else {
2858 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2859 let expr = aggr.call(non_col_args.clone());
2860 non_col_args.pop();
2861 expr
2862 }
2863 })
2864 .collect::<Vec<_>>();
2865
2866 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2868 let prev_field_exprs: Vec<_> = self
2869 .ctx
2870 .field_columns
2871 .iter()
2872 .map(|col| DfExpr::Column(Column::from_name(col)))
2873 .collect();
2874
2875 ensure!(
2876 self.ctx.field_columns.len() == 1,
2877 UnsupportedExprSnafu {
2878 name: "count_values on multi-value input"
2879 }
2880 );
2881
2882 prev_field_exprs
2883 } else {
2884 vec![]
2885 };
2886
2887 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2889
2890 let normalized_exprs =
2891 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2892 for expr in normalized_exprs {
2893 new_field_columns.push(expr.schema_name().to_string());
2894 }
2895 self.ctx.field_columns = new_field_columns;
2896
2897 Ok((exprs, prev_field_exprs))
2898 }
2899
2900 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2901 let param = param
2902 .as_deref()
2903 .with_context(|| FunctionInvalidArgumentSnafu {
2904 fn_name: op.to_string(),
2905 })?;
2906 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2907 return FunctionInvalidArgumentSnafu {
2908 fn_name: op.to_string(),
2909 }
2910 .fail();
2911 };
2912
2913 Ok(val)
2914 }
2915
2916 fn get_param_as_literal_expr(
2917 param: &Option<Box<PromExpr>>,
2918 op: Option<TokenType>,
2919 expected_type: Option<ArrowDataType>,
2920 ) -> Result<DfExpr> {
2921 let prom_param = param.as_deref().with_context(|| {
2922 if let Some(op) = op {
2923 FunctionInvalidArgumentSnafu {
2924 fn_name: op.to_string(),
2925 }
2926 } else {
2927 FunctionInvalidArgumentSnafu {
2928 fn_name: "unknown".to_string(),
2929 }
2930 }
2931 })?;
2932
2933 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2934 if let Some(op) = op {
2935 FunctionInvalidArgumentSnafu {
2936 fn_name: op.to_string(),
2937 }
2938 } else {
2939 FunctionInvalidArgumentSnafu {
2940 fn_name: "unknown".to_string(),
2941 }
2942 }
2943 })?;
2944
2945 if let Some(expected_type) = expected_type {
2947 let expr_type = expr
2949 .get_type(&DFSchema::empty())
2950 .context(DataFusionPlanningSnafu)?;
2951 if expected_type != expr_type {
2952 return FunctionInvalidArgumentSnafu {
2953 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2954 }
2955 .fail();
2956 }
2957 }
2958
2959 Ok(expr)
2960 }
2961
2962 fn create_window_exprs(
2965 &mut self,
2966 op: TokenType,
2967 group_exprs: Vec<DfExpr>,
2968 input_plan: &LogicalPlan,
2969 ) -> Result<Vec<DfExpr>> {
2970 ensure!(
2971 self.ctx.field_columns.len() == 1,
2972 UnsupportedExprSnafu {
2973 name: "topk or bottomk on multi-value input"
2974 }
2975 );
2976
2977 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2978
2979 let asc = matches!(op.id(), token::T_BOTTOMK);
2980
2981 let tag_sort_exprs = self
2982 .create_tag_column_exprs()?
2983 .into_iter()
2984 .map(|expr| expr.sort(asc, true));
2985
2986 let exprs: Vec<DfExpr> = self
2988 .ctx
2989 .field_columns
2990 .iter()
2991 .map(|col| {
2992 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2993 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2995 sort_exprs.extend(tag_sort_exprs.clone());
2998
2999 DfExpr::WindowFunction(Box::new(WindowFunction {
3000 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
3001 params: WindowFunctionParams {
3002 args: vec![],
3003 partition_by: group_exprs.clone(),
3004 order_by: sort_exprs,
3005 window_frame: WindowFrame::new(Some(true)),
3006 null_treatment: None,
3007 distinct: false,
3008 filter: None,
3009 },
3010 }))
3011 })
3012 .collect();
3013
3014 let normalized_exprs =
3015 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3016 Ok(normalized_exprs)
3017 }
3018
3019 #[deprecated(
3021 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3022 )]
3023 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3024 match expr {
3025 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3026 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3027 PromExpr::Unary(UnaryExpr { expr, .. }) => {
3028 Self::try_build_float_literal(expr).map(|f| -f)
3029 }
3030 PromExpr::StringLiteral(_)
3031 | PromExpr::Binary(_)
3032 | PromExpr::VectorSelector(_)
3033 | PromExpr::MatrixSelector(_)
3034 | PromExpr::Call(_)
3035 | PromExpr::Extension(_)
3036 | PromExpr::Aggregate(_)
3037 | PromExpr::Subquery(_) => None,
3038 }
3039 }
3040
3041 async fn create_histogram_plan(
3043 &mut self,
3044 args: &PromFunctionArgs,
3045 query_engine_state: &QueryEngineState,
3046 ) -> Result<LogicalPlan> {
3047 if args.args.len() != 2 {
3048 return FunctionInvalidArgumentSnafu {
3049 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3050 }
3051 .fail();
3052 }
3053 #[allow(deprecated)]
3054 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3055 FunctionInvalidArgumentSnafu {
3056 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3057 }
3058 })?;
3059
3060 let input = args.args[1].as_ref().clone();
3061 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3062 let input_plan = self.strip_tsid_column(input_plan)?;
3066 self.ctx.use_tsid = false;
3067
3068 if !self.ctx.has_le_tag() {
3069 return Ok(LogicalPlan::EmptyRelation(
3072 datafusion::logical_expr::EmptyRelation {
3073 produce_one_row: false,
3074 schema: Arc::new(DFSchema::empty()),
3075 },
3076 ));
3077 }
3078 let time_index_column =
3079 self.ctx
3080 .time_index_column
3081 .clone()
3082 .with_context(|| TimeIndexNotFoundSnafu {
3083 table: self.ctx.table_name.clone().unwrap_or_default(),
3084 })?;
3085 let field_column = self
3087 .ctx
3088 .field_columns
3089 .first()
3090 .with_context(|| FunctionInvalidArgumentSnafu {
3091 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3092 })?
3093 .clone();
3094 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3096
3097 Ok(LogicalPlan::Extension(Extension {
3098 node: Arc::new(
3099 HistogramFold::new(
3100 LE_COLUMN_NAME.to_string(),
3101 field_column,
3102 time_index_column,
3103 phi,
3104 input_plan,
3105 )
3106 .context(DataFusionPlanningSnafu)?,
3107 ),
3108 }))
3109 }
3110
3111 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3113 if args.args.len() != 1 {
3114 return FunctionInvalidArgumentSnafu {
3115 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3116 }
3117 .fail();
3118 }
3119 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3120
3121 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3123 self.ctx.reset_table_name_and_schema();
3124 self.ctx.tag_columns = vec![];
3125 self.ctx.field_columns = vec![greptime_value().to_string()];
3126 Ok(LogicalPlan::Extension(Extension {
3127 node: Arc::new(
3128 EmptyMetric::new(
3129 self.ctx.start,
3130 self.ctx.end,
3131 self.ctx.interval,
3132 SPECIAL_TIME_FUNCTION.to_string(),
3133 greptime_value().to_string(),
3134 Some(lit),
3135 )
3136 .context(DataFusionPlanningSnafu)?,
3137 ),
3138 }))
3139 }
3140
3141 async fn create_scalar_plan(
3143 &mut self,
3144 args: &PromFunctionArgs,
3145 query_engine_state: &QueryEngineState,
3146 ) -> Result<LogicalPlan> {
3147 ensure!(
3148 args.len() == 1,
3149 FunctionInvalidArgumentSnafu {
3150 fn_name: SCALAR_FUNCTION
3151 }
3152 );
3153 let input = self
3154 .prom_expr_to_plan(&args.args[0], query_engine_state)
3155 .await?;
3156 ensure!(
3157 self.ctx.field_columns.len() == 1,
3158 MultiFieldsNotSupportedSnafu {
3159 operator: SCALAR_FUNCTION
3160 },
3161 );
3162 let scalar_plan = LogicalPlan::Extension(Extension {
3163 node: Arc::new(
3164 ScalarCalculate::new(
3165 self.ctx.start,
3166 self.ctx.end,
3167 self.ctx.interval,
3168 input,
3169 self.ctx.time_index_column.as_ref().unwrap(),
3170 &self.ctx.tag_columns,
3171 &self.ctx.field_columns[0],
3172 self.ctx.table_name.as_deref(),
3173 )
3174 .context(PromqlPlanNodeSnafu)?,
3175 ),
3176 });
3177 self.ctx.tag_columns.clear();
3179 self.ctx.field_columns.clear();
3180 self.ctx
3181 .field_columns
3182 .push(scalar_plan.schema().field(1).name().clone());
3183 Ok(scalar_plan)
3184 }
3185
3186 async fn create_absent_plan(
3188 &mut self,
3189 args: &PromFunctionArgs,
3190 query_engine_state: &QueryEngineState,
3191 ) -> Result<LogicalPlan> {
3192 if args.args.len() != 1 {
3193 return FunctionInvalidArgumentSnafu {
3194 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3195 }
3196 .fail();
3197 }
3198 let input = self
3199 .prom_expr_to_plan(&args.args[0], query_engine_state)
3200 .await?;
3201
3202 let time_index_expr = self.create_time_index_column_expr()?;
3203 let first_field_expr =
3204 self.create_field_column_exprs()?
3205 .pop()
3206 .with_context(|| ValueNotFoundSnafu {
3207 table: self.ctx.table_name.clone().unwrap_or_default(),
3208 })?;
3209 let first_value_expr = first_value(first_field_expr, vec![]);
3210
3211 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3212 .aggregate(
3213 vec![time_index_expr.clone()],
3214 vec![first_value_expr.clone()],
3215 )
3216 .context(DataFusionPlanningSnafu)?
3217 .sort(vec![time_index_expr.sort(true, false)])
3218 .context(DataFusionPlanningSnafu)?
3219 .build()
3220 .context(DataFusionPlanningSnafu)?;
3221
3222 let fake_labels = self
3223 .ctx
3224 .selector_matcher
3225 .iter()
3226 .filter_map(|matcher| match matcher.op {
3227 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3228 _ => None,
3229 })
3230 .collect::<Vec<_>>();
3231
3232 let absent_plan = LogicalPlan::Extension(Extension {
3234 node: Arc::new(
3235 Absent::try_new(
3236 self.ctx.start,
3237 self.ctx.end,
3238 self.ctx.interval,
3239 self.ctx.time_index_column.as_ref().unwrap().clone(),
3240 self.ctx.field_columns[0].clone(),
3241 fake_labels,
3242 ordered_aggregated_input,
3243 )
3244 .context(DataFusionPlanningSnafu)?,
3245 ),
3246 });
3247
3248 Ok(absent_plan)
3249 }
3250
3251 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3254 match expr {
3255 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3256 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3257 PromExpr::VectorSelector(_)
3258 | PromExpr::MatrixSelector(_)
3259 | PromExpr::Extension(_)
3260 | PromExpr::Aggregate(_)
3261 | PromExpr::Subquery(_) => None,
3262 PromExpr::Call(Call { func, .. }) => {
3263 if func.name == SPECIAL_TIME_FUNCTION {
3264 None
3267 } else {
3268 None
3269 }
3270 }
3271 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3272 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3274 PromExpr::Binary(PromBinaryExpr {
3275 lhs,
3276 rhs,
3277 op,
3278 modifier,
3279 }) => {
3280 let lhs = Self::try_build_literal_expr(lhs)?;
3281 let rhs = Self::try_build_literal_expr(rhs)?;
3282 let is_comparison_op = Self::is_token_a_comparison_op(*op);
3283 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3284 let expr = expr_builder(lhs, rhs).ok()?;
3285
3286 let should_return_bool = if let Some(m) = modifier {
3287 m.return_bool
3288 } else {
3289 false
3290 };
3291 if is_comparison_op && should_return_bool {
3292 Some(DfExpr::Cast(Cast {
3293 expr: Box::new(expr),
3294 data_type: ArrowDataType::Float64,
3295 }))
3296 } else {
3297 Some(expr)
3298 }
3299 }
3300 }
3301 }
3302
3303 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3304 match expr {
3305 PromExpr::Call(Call { func, .. }) => {
3306 if func.name == SPECIAL_TIME_FUNCTION
3307 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3308 {
3309 Some(build_special_time_expr(time_index_col))
3310 } else {
3311 None
3312 }
3313 }
3314 _ => None,
3315 }
3316 }
3317
3318 #[allow(clippy::type_complexity)]
3321 fn prom_token_to_binary_expr_builder(
3322 token: TokenType,
3323 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3324 match token.id() {
3325 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
3326 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
3327 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
3328 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
3329 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
3330 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3331 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3332 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3333 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3334 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3335 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3336 token::T_POW => Ok(Box::new(|lhs, rhs| {
3337 Ok(DfExpr::ScalarFunction(ScalarFunction {
3338 func: datafusion_functions::math::power(),
3339 args: vec![lhs, rhs],
3340 }))
3341 })),
3342 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
3343 Ok(DfExpr::ScalarFunction(ScalarFunction {
3344 func: datafusion_functions::math::atan2(),
3345 args: vec![lhs, rhs],
3346 }))
3347 })),
3348 _ => UnexpectedTokenSnafu { token }.fail(),
3349 }
3350 }
3351
3352 fn is_token_a_comparison_op(token: TokenType) -> bool {
3354 matches!(
3355 token.id(),
3356 token::T_EQLC
3357 | token::T_NEQ
3358 | token::T_GTR
3359 | token::T_LSS
3360 | token::T_GTE
3361 | token::T_LTE
3362 )
3363 }
3364
3365 fn is_token_a_set_op(token: TokenType) -> bool {
3367 matches!(
3368 token.id(),
3369 token::T_LAND | token::T_LOR | token::T_LUNLESS )
3373 }
3374
3375 #[allow(clippy::too_many_arguments)]
3378 fn join_on_non_field_columns(
3379 &self,
3380 left: LogicalPlan,
3381 right: LogicalPlan,
3382 left_table_ref: TableReference,
3383 right_table_ref: TableReference,
3384 left_time_index_column: Option<String>,
3385 right_time_index_column: Option<String>,
3386 only_join_time_index: bool,
3387 modifier: &Option<BinModifier>,
3388 ) -> Result<LogicalPlan> {
3389 let mut left_tag_columns = if only_join_time_index {
3390 BTreeSet::new()
3391 } else {
3392 self.ctx
3393 .tag_columns
3394 .iter()
3395 .cloned()
3396 .collect::<BTreeSet<_>>()
3397 };
3398 let mut right_tag_columns = left_tag_columns.clone();
3399
3400 if let Some(modifier) = modifier {
3402 if let Some(matching) = &modifier.matching {
3404 match matching {
3405 LabelModifier::Include(on) => {
3407 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3408 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3409 right_tag_columns =
3410 right_tag_columns.intersection(&mask).cloned().collect();
3411 }
3412 LabelModifier::Exclude(ignoring) => {
3414 for label in &ignoring.labels {
3416 let _ = left_tag_columns.remove(label);
3417 let _ = right_tag_columns.remove(label);
3418 }
3419 }
3420 }
3421 }
3422 }
3423
3424 if let (Some(left_time_index_column), Some(right_time_index_column)) =
3426 (left_time_index_column, right_time_index_column)
3427 {
3428 left_tag_columns.insert(left_time_index_column);
3429 right_tag_columns.insert(right_time_index_column);
3430 }
3431
3432 let right = LogicalPlanBuilder::from(right)
3433 .alias(right_table_ref)
3434 .context(DataFusionPlanningSnafu)?
3435 .build()
3436 .context(DataFusionPlanningSnafu)?;
3437
3438 LogicalPlanBuilder::from(left)
3440 .alias(left_table_ref)
3441 .context(DataFusionPlanningSnafu)?
3442 .join_detailed(
3443 right,
3444 JoinType::Inner,
3445 (
3446 left_tag_columns
3447 .into_iter()
3448 .map(Column::from_name)
3449 .collect::<Vec<_>>(),
3450 right_tag_columns
3451 .into_iter()
3452 .map(Column::from_name)
3453 .collect::<Vec<_>>(),
3454 ),
3455 None,
3456 NullEquality::NullEqualsNull,
3457 )
3458 .context(DataFusionPlanningSnafu)?
3459 .build()
3460 .context(DataFusionPlanningSnafu)
3461 }
3462
3463 fn set_op_on_non_field_columns(
3465 &mut self,
3466 left: LogicalPlan,
3467 mut right: LogicalPlan,
3468 left_context: PromPlannerContext,
3469 right_context: PromPlannerContext,
3470 op: TokenType,
3471 modifier: &Option<BinModifier>,
3472 ) -> Result<LogicalPlan> {
3473 let mut left_tag_col_set = left_context
3474 .tag_columns
3475 .iter()
3476 .cloned()
3477 .collect::<HashSet<_>>();
3478 let mut right_tag_col_set = right_context
3479 .tag_columns
3480 .iter()
3481 .cloned()
3482 .collect::<HashSet<_>>();
3483
3484 if matches!(op.id(), token::T_LOR) {
3485 return self.or_operator(
3486 left,
3487 right,
3488 left_tag_col_set,
3489 right_tag_col_set,
3490 left_context,
3491 right_context,
3492 modifier,
3493 );
3494 }
3495
3496 if let Some(modifier) = modifier {
3498 ensure!(
3500 matches!(
3501 modifier.card,
3502 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3503 ),
3504 UnsupportedVectorMatchSnafu {
3505 name: modifier.card.clone(),
3506 },
3507 );
3508 if let Some(matching) = &modifier.matching {
3510 match matching {
3511 LabelModifier::Include(on) => {
3513 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3514 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3515 right_tag_col_set =
3516 right_tag_col_set.intersection(&mask).cloned().collect();
3517 }
3518 LabelModifier::Exclude(ignoring) => {
3520 for label in &ignoring.labels {
3522 let _ = left_tag_col_set.remove(label);
3523 let _ = right_tag_col_set.remove(label);
3524 }
3525 }
3526 }
3527 }
3528 }
3529 if !matches!(op.id(), token::T_LOR) {
3531 ensure!(
3532 left_tag_col_set == right_tag_col_set,
3533 CombineTableColumnMismatchSnafu {
3534 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3535 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3536 }
3537 )
3538 };
3539 let left_time_index = left_context.time_index_column.clone().unwrap();
3540 let right_time_index = right_context.time_index_column.clone().unwrap();
3541 let join_keys = left_tag_col_set
3542 .iter()
3543 .cloned()
3544 .chain([left_time_index.clone()])
3545 .collect::<Vec<_>>();
3546 self.ctx.time_index_column = Some(left_time_index.clone());
3547 self.ctx.use_tsid = left_context.use_tsid;
3548
3549 if left_context.time_index_column != right_context.time_index_column {
3551 let right_project_exprs = right
3552 .schema()
3553 .fields()
3554 .iter()
3555 .map(|field| {
3556 if field.name() == &right_time_index {
3557 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3558 } else {
3559 DfExpr::Column(Column::from_name(field.name()))
3560 }
3561 })
3562 .collect::<Vec<_>>();
3563
3564 right = LogicalPlanBuilder::from(right)
3565 .project(right_project_exprs)
3566 .context(DataFusionPlanningSnafu)?
3567 .build()
3568 .context(DataFusionPlanningSnafu)?;
3569 }
3570
3571 ensure!(
3572 left_context.field_columns.len() == 1,
3573 MultiFieldsNotSupportedSnafu {
3574 operator: "AND operator"
3575 }
3576 );
3577 let left_field_col = left_context.field_columns.first().unwrap();
3580 self.ctx.field_columns = vec![left_field_col.clone()];
3581
3582 match op.id() {
3585 token::T_LAND => LogicalPlanBuilder::from(left)
3586 .distinct()
3587 .context(DataFusionPlanningSnafu)?
3588 .join_detailed(
3589 right,
3590 JoinType::LeftSemi,
3591 (join_keys.clone(), join_keys),
3592 None,
3593 NullEquality::NullEqualsNull,
3594 )
3595 .context(DataFusionPlanningSnafu)?
3596 .build()
3597 .context(DataFusionPlanningSnafu),
3598 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3599 .distinct()
3600 .context(DataFusionPlanningSnafu)?
3601 .join_detailed(
3602 right,
3603 JoinType::LeftAnti,
3604 (join_keys.clone(), join_keys),
3605 None,
3606 NullEquality::NullEqualsNull,
3607 )
3608 .context(DataFusionPlanningSnafu)?
3609 .build()
3610 .context(DataFusionPlanningSnafu),
3611 token::T_LOR => {
3612 unreachable!()
3615 }
3616 _ => UnexpectedTokenSnafu { token: op }.fail(),
3617 }
3618 }
3619
3620 #[allow(clippy::too_many_arguments)]
3622 fn or_operator(
3623 &mut self,
3624 left: LogicalPlan,
3625 right: LogicalPlan,
3626 left_tag_cols_set: HashSet<String>,
3627 right_tag_cols_set: HashSet<String>,
3628 left_context: PromPlannerContext,
3629 right_context: PromPlannerContext,
3630 modifier: &Option<BinModifier>,
3631 ) -> Result<LogicalPlan> {
3632 ensure!(
3634 left_context.field_columns.len() == right_context.field_columns.len(),
3635 CombineTableColumnMismatchSnafu {
3636 left: left_context.field_columns.clone(),
3637 right: right_context.field_columns.clone()
3638 }
3639 );
3640 ensure!(
3641 left_context.field_columns.len() == 1,
3642 MultiFieldsNotSupportedSnafu {
3643 operator: "OR operator"
3644 }
3645 );
3646
3647 let all_tags = left_tag_cols_set
3649 .union(&right_tag_cols_set)
3650 .cloned()
3651 .collect::<HashSet<_>>();
3652 let tags_not_in_left = all_tags
3653 .difference(&left_tag_cols_set)
3654 .cloned()
3655 .collect::<Vec<_>>();
3656 let tags_not_in_right = all_tags
3657 .difference(&right_tag_cols_set)
3658 .cloned()
3659 .collect::<Vec<_>>();
3660 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3661 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3662 let left_qualifier_string = left_qualifier
3663 .as_ref()
3664 .map(|l| l.to_string())
3665 .unwrap_or_default();
3666 let right_qualifier_string = right_qualifier
3667 .as_ref()
3668 .map(|r| r.to_string())
3669 .unwrap_or_default();
3670 let left_time_index_column =
3671 left_context
3672 .time_index_column
3673 .clone()
3674 .with_context(|| TimeIndexNotFoundSnafu {
3675 table: left_qualifier_string.clone(),
3676 })?;
3677 let right_time_index_column =
3678 right_context
3679 .time_index_column
3680 .clone()
3681 .with_context(|| TimeIndexNotFoundSnafu {
3682 table: right_qualifier_string.clone(),
3683 })?;
3684 let left_field_col = left_context.field_columns.first().unwrap();
3686 let right_field_col = right_context.field_columns.first().unwrap();
3687 let left_has_tsid = left
3688 .schema()
3689 .fields()
3690 .iter()
3691 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3692 let right_has_tsid = right
3693 .schema()
3694 .fields()
3695 .iter()
3696 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3697
3698 let mut all_columns_set = left
3700 .schema()
3701 .fields()
3702 .iter()
3703 .chain(right.schema().fields().iter())
3704 .map(|field| field.name().clone())
3705 .collect::<HashSet<_>>();
3706 if !(left_has_tsid && right_has_tsid) {
3709 all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3710 }
3711 all_columns_set.remove(&left_time_index_column);
3713 all_columns_set.remove(&right_time_index_column);
3714 if left_field_col != right_field_col {
3716 all_columns_set.remove(right_field_col);
3717 }
3718 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3719 all_columns.sort_unstable();
3721 all_columns.insert(0, left_time_index_column.clone());
3723
3724 let left_proj_exprs = all_columns.iter().map(|col| {
3726 if tags_not_in_left.contains(col) {
3727 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3728 } else {
3729 DfExpr::Column(Column::new(None::<String>, col))
3730 }
3731 });
3732 let right_time_index_expr = DfExpr::Column(Column::new(
3733 right_qualifier.clone(),
3734 right_time_index_column,
3735 ))
3736 .alias(left_time_index_column.clone());
3737 let right_qualifier_for_field = right
3740 .schema()
3741 .iter()
3742 .find(|(_, f)| f.name() == right_field_col)
3743 .map(|(q, _)| q)
3744 .with_context(|| ColumnNotFoundSnafu {
3745 col: right_field_col.clone(),
3746 })?
3747 .cloned();
3748
3749 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3751 if col == left_field_col && left_field_col != right_field_col {
3753 DfExpr::Column(Column::new(
3755 right_qualifier_for_field.clone(),
3756 right_field_col,
3757 ))
3758 } else if tags_not_in_right.contains(col) {
3759 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3760 } else {
3761 DfExpr::Column(Column::new(None::<String>, col))
3762 }
3763 });
3764 let right_proj_exprs = [right_time_index_expr]
3765 .into_iter()
3766 .chain(right_proj_exprs_without_time_index);
3767
3768 let left_projected = LogicalPlanBuilder::from(left)
3769 .project(left_proj_exprs)
3770 .context(DataFusionPlanningSnafu)?
3771 .alias(left_qualifier_string.clone())
3772 .context(DataFusionPlanningSnafu)?
3773 .build()
3774 .context(DataFusionPlanningSnafu)?;
3775 let right_projected = LogicalPlanBuilder::from(right)
3776 .project(right_proj_exprs)
3777 .context(DataFusionPlanningSnafu)?
3778 .alias(right_qualifier_string.clone())
3779 .context(DataFusionPlanningSnafu)?
3780 .build()
3781 .context(DataFusionPlanningSnafu)?;
3782
3783 let mut match_columns = if let Some(modifier) = modifier
3785 && let Some(matching) = &modifier.matching
3786 {
3787 match matching {
3788 LabelModifier::Include(on) => on.labels.clone(),
3790 LabelModifier::Exclude(ignoring) => {
3792 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3793 all_tags.difference(&ignoring).cloned().collect()
3794 }
3795 }
3796 } else {
3797 all_tags.iter().cloned().collect()
3798 };
3799 match_columns.sort_unstable();
3801 let schema = left_projected.schema().clone();
3803 let union_distinct_on = UnionDistinctOn::new(
3804 left_projected,
3805 right_projected,
3806 match_columns,
3807 left_time_index_column.clone(),
3808 schema,
3809 );
3810 let result = LogicalPlan::Extension(Extension {
3811 node: Arc::new(union_distinct_on),
3812 });
3813
3814 self.ctx.time_index_column = Some(left_time_index_column);
3816 self.ctx.tag_columns = all_tags.into_iter().collect();
3817 self.ctx.field_columns = vec![left_field_col.clone()];
3818 self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3819
3820 Ok(result)
3821 }
3822
3823 fn projection_for_each_field_column<F>(
3831 &mut self,
3832 input: LogicalPlan,
3833 name_to_expr: F,
3834 ) -> Result<LogicalPlan>
3835 where
3836 F: FnMut(&String) -> Result<DfExpr>,
3837 {
3838 let non_field_columns_iter = self
3839 .ctx
3840 .tag_columns
3841 .iter()
3842 .chain(self.ctx.time_index_column.iter())
3843 .map(|col| {
3844 Ok(DfExpr::Column(Column::new(
3845 self.ctx.table_name.clone().map(TableReference::bare),
3846 col,
3847 )))
3848 });
3849
3850 let result_field_columns = self
3852 .ctx
3853 .field_columns
3854 .iter()
3855 .map(name_to_expr)
3856 .collect::<Result<Vec<_>>>()?;
3857
3858 self.ctx.field_columns = result_field_columns
3860 .iter()
3861 .map(|expr| expr.schema_name().to_string())
3862 .collect();
3863 let field_columns_iter = result_field_columns
3864 .into_iter()
3865 .zip(self.ctx.field_columns.iter())
3866 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3867
3868 let project_fields = non_field_columns_iter
3870 .chain(field_columns_iter)
3871 .collect::<Result<Vec<_>>>()?;
3872
3873 LogicalPlanBuilder::from(input)
3874 .project(project_fields)
3875 .context(DataFusionPlanningSnafu)?
3876 .build()
3877 .context(DataFusionPlanningSnafu)
3878 }
3879
3880 fn filter_on_field_column<F>(
3883 &self,
3884 input: LogicalPlan,
3885 mut name_to_expr: F,
3886 ) -> Result<LogicalPlan>
3887 where
3888 F: FnMut(&String) -> Result<DfExpr>,
3889 {
3890 ensure!(
3891 self.ctx.field_columns.len() == 1,
3892 UnsupportedExprSnafu {
3893 name: "filter on multi-value input"
3894 }
3895 );
3896
3897 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3898
3899 LogicalPlanBuilder::from(input)
3900 .filter(field_column_filter)
3901 .context(DataFusionPlanningSnafu)?
3902 .build()
3903 .context(DataFusionPlanningSnafu)
3904 }
3905
3906 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3909 let input_expr = datafusion::logical_expr::col(
3910 self.ctx
3911 .time_index_column
3912 .as_ref()
3913 .with_context(|| TimeIndexNotFoundSnafu {
3915 table: "<doesn't matter>",
3916 })?,
3917 );
3918 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3919 func: datafusion_functions::datetime::date_part(),
3920 args: vec![date_part.lit(), input_expr],
3921 });
3922 Ok(fn_expr)
3923 }
3924
3925 fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3926 let schema = plan.schema();
3927 if !schema
3928 .fields()
3929 .iter()
3930 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3931 {
3932 return Ok(plan);
3933 }
3934
3935 let project_exprs = schema
3936 .fields()
3937 .iter()
3938 .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3939 .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3940 .collect::<Result<Vec<_>>>()?;
3941
3942 LogicalPlanBuilder::from(plan)
3943 .project(project_exprs)
3944 .context(DataFusionPlanningSnafu)?
3945 .build()
3946 .context(DataFusionPlanningSnafu)
3947 }
3948
3949 fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
3951 let fields_expr = self.create_field_column_exprs()?;
3952
3953 ensure!(
3955 fields_expr.len() == 1,
3956 UnsupportedExprSnafu {
3957 name: "alias on multi-value result"
3958 }
3959 );
3960
3961 let project_fields = fields_expr
3962 .into_iter()
3963 .map(|expr| expr.alias(&alias_name))
3964 .chain(self.create_tag_column_exprs()?)
3965 .chain(Some(self.create_time_index_column_expr()?));
3966
3967 LogicalPlanBuilder::from(plan)
3968 .project(project_fields)
3969 .context(DataFusionPlanningSnafu)?
3970 .build()
3971 .context(DataFusionPlanningSnafu)
3972 }
3973}
3974
3975#[derive(Default, Debug)]
3976struct FunctionArgs {
3977 input: Option<PromExpr>,
3978 literals: Vec<DfExpr>,
3979}
3980
3981#[derive(Debug, Clone)]
3984enum ScalarFunc {
3985 DataFusionBuiltin(Arc<ScalarUdfDef>),
3989 DataFusionUdf(Arc<ScalarUdfDef>),
3993 Udf(Arc<ScalarUdfDef>),
3998 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4005 GeneratedExpr,
4009}
4010
4011#[cfg(test)]
4012mod test {
4013 use std::time::{Duration, UNIX_EPOCH};
4014
4015 use catalog::RegisterTableRequest;
4016 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4017 use common_base::Plugins;
4018 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4019 use common_query::prelude::greptime_timestamp;
4020 use common_query::test_util::DummyDecoder;
4021 use datatypes::prelude::ConcreteDataType;
4022 use datatypes::schema::{ColumnSchema, Schema};
4023 use promql_parser::label::Labels;
4024 use promql_parser::parser;
4025 use session::context::QueryContext;
4026 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4027 use table::test_util::EmptyTable;
4028
4029 use super::*;
4030 use crate::options::QueryOptions;
4031 use crate::parser::QueryLanguageParser;
4032
4033 fn build_query_engine_state() -> QueryEngineState {
4034 QueryEngineState::new(
4035 new_memory_catalog_manager().unwrap(),
4036 None,
4037 None,
4038 None,
4039 None,
4040 None,
4041 false,
4042 Plugins::default(),
4043 QueryOptions::default(),
4044 )
4045 }
4046
4047 async fn build_test_table_provider(
4048 table_name_tuples: &[(String, String)],
4049 num_tag: usize,
4050 num_field: usize,
4051 ) -> DfTableSourceProvider {
4052 let catalog_list = MemoryCatalogManager::with_default_setup();
4053 for (schema_name, table_name) in table_name_tuples {
4054 let mut columns = vec![];
4055 for i in 0..num_tag {
4056 columns.push(ColumnSchema::new(
4057 format!("tag_{i}"),
4058 ConcreteDataType::string_datatype(),
4059 false,
4060 ));
4061 }
4062 columns.push(
4063 ColumnSchema::new(
4064 "timestamp".to_string(),
4065 ConcreteDataType::timestamp_millisecond_datatype(),
4066 false,
4067 )
4068 .with_time_index(true),
4069 );
4070 for i in 0..num_field {
4071 columns.push(ColumnSchema::new(
4072 format!("field_{i}"),
4073 ConcreteDataType::float64_datatype(),
4074 true,
4075 ));
4076 }
4077 let schema = Arc::new(Schema::new(columns));
4078 let table_meta = TableMetaBuilder::empty()
4079 .schema(schema)
4080 .primary_key_indices((0..num_tag).collect())
4081 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4082 .next_column_id(1024)
4083 .build()
4084 .unwrap();
4085 let table_info = TableInfoBuilder::default()
4086 .name(table_name.clone())
4087 .meta(table_meta)
4088 .build()
4089 .unwrap();
4090 let table = EmptyTable::from_table_info(&table_info);
4091
4092 assert!(
4093 catalog_list
4094 .register_table_sync(RegisterTableRequest {
4095 catalog: DEFAULT_CATALOG_NAME.to_string(),
4096 schema: schema_name.clone(),
4097 table_name: table_name.clone(),
4098 table_id: 1024,
4099 table,
4100 })
4101 .is_ok()
4102 );
4103 }
4104
4105 DfTableSourceProvider::new(
4106 catalog_list,
4107 false,
4108 QueryContext::arc(),
4109 DummyDecoder::arc(),
4110 false,
4111 )
4112 }
4113
4114 async fn build_test_table_provider_with_tsid(
4115 table_name_tuples: &[(String, String)],
4116 num_tag: usize,
4117 num_field: usize,
4118 ) -> DfTableSourceProvider {
4119 let catalog_list = MemoryCatalogManager::with_default_setup();
4120
4121 let physical_table_name = "phy";
4122 let physical_table_id = 999u32;
4123
4124 {
4126 let mut columns = vec![
4127 ColumnSchema::new(
4128 DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4129 ConcreteDataType::uint32_datatype(),
4130 false,
4131 ),
4132 ColumnSchema::new(
4133 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4134 ConcreteDataType::uint64_datatype(),
4135 false,
4136 ),
4137 ];
4138 for i in 0..num_tag {
4139 columns.push(ColumnSchema::new(
4140 format!("tag_{i}"),
4141 ConcreteDataType::string_datatype(),
4142 false,
4143 ));
4144 }
4145 columns.push(
4146 ColumnSchema::new(
4147 "timestamp".to_string(),
4148 ConcreteDataType::timestamp_millisecond_datatype(),
4149 false,
4150 )
4151 .with_time_index(true),
4152 );
4153 for i in 0..num_field {
4154 columns.push(ColumnSchema::new(
4155 format!("field_{i}"),
4156 ConcreteDataType::float64_datatype(),
4157 true,
4158 ));
4159 }
4160
4161 let schema = Arc::new(Schema::new(columns));
4162 let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4163 let table_meta = TableMetaBuilder::empty()
4164 .schema(schema)
4165 .primary_key_indices(primary_key_indices)
4166 .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4167 .engine(METRIC_ENGINE_NAME.to_string())
4168 .next_column_id(1024)
4169 .build()
4170 .unwrap();
4171 let table_info = TableInfoBuilder::default()
4172 .table_id(physical_table_id)
4173 .name(physical_table_name)
4174 .meta(table_meta)
4175 .build()
4176 .unwrap();
4177 let table = EmptyTable::from_table_info(&table_info);
4178
4179 assert!(
4180 catalog_list
4181 .register_table_sync(RegisterTableRequest {
4182 catalog: DEFAULT_CATALOG_NAME.to_string(),
4183 schema: DEFAULT_SCHEMA_NAME.to_string(),
4184 table_name: physical_table_name.to_string(),
4185 table_id: physical_table_id,
4186 table,
4187 })
4188 .is_ok()
4189 );
4190 }
4191
4192 for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4194 let mut columns = vec![];
4195 for i in 0..num_tag {
4196 columns.push(ColumnSchema::new(
4197 format!("tag_{i}"),
4198 ConcreteDataType::string_datatype(),
4199 false,
4200 ));
4201 }
4202 columns.push(
4203 ColumnSchema::new(
4204 "timestamp".to_string(),
4205 ConcreteDataType::timestamp_millisecond_datatype(),
4206 false,
4207 )
4208 .with_time_index(true),
4209 );
4210 for i in 0..num_field {
4211 columns.push(ColumnSchema::new(
4212 format!("field_{i}"),
4213 ConcreteDataType::float64_datatype(),
4214 true,
4215 ));
4216 }
4217
4218 let schema = Arc::new(Schema::new(columns));
4219 let mut options = table::requests::TableOptions::default();
4220 options.extra_options.insert(
4221 LOGICAL_TABLE_METADATA_KEY.to_string(),
4222 physical_table_name.to_string(),
4223 );
4224 let table_id = 1024u32 + idx as u32;
4225 let table_meta = TableMetaBuilder::empty()
4226 .schema(schema)
4227 .primary_key_indices((0..num_tag).collect())
4228 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4229 .engine(METRIC_ENGINE_NAME.to_string())
4230 .options(options)
4231 .next_column_id(1024)
4232 .build()
4233 .unwrap();
4234 let table_info = TableInfoBuilder::default()
4235 .table_id(table_id)
4236 .name(table_name.clone())
4237 .meta(table_meta)
4238 .build()
4239 .unwrap();
4240 let table = EmptyTable::from_table_info(&table_info);
4241
4242 assert!(
4243 catalog_list
4244 .register_table_sync(RegisterTableRequest {
4245 catalog: DEFAULT_CATALOG_NAME.to_string(),
4246 schema: schema_name.clone(),
4247 table_name: table_name.clone(),
4248 table_id,
4249 table,
4250 })
4251 .is_ok()
4252 );
4253 }
4254
4255 DfTableSourceProvider::new(
4256 catalog_list,
4257 false,
4258 QueryContext::arc(),
4259 DummyDecoder::arc(),
4260 false,
4261 )
4262 }
4263
4264 async fn build_test_table_provider_with_fields(
4265 table_name_tuples: &[(String, String)],
4266 tags: &[&str],
4267 ) -> DfTableSourceProvider {
4268 let catalog_list = MemoryCatalogManager::with_default_setup();
4269 for (schema_name, table_name) in table_name_tuples {
4270 let mut columns = vec![];
4271 let num_tag = tags.len();
4272 for tag in tags {
4273 columns.push(ColumnSchema::new(
4274 tag.to_string(),
4275 ConcreteDataType::string_datatype(),
4276 false,
4277 ));
4278 }
4279 columns.push(
4280 ColumnSchema::new(
4281 greptime_timestamp().to_string(),
4282 ConcreteDataType::timestamp_millisecond_datatype(),
4283 false,
4284 )
4285 .with_time_index(true),
4286 );
4287 columns.push(ColumnSchema::new(
4288 greptime_value().to_string(),
4289 ConcreteDataType::float64_datatype(),
4290 true,
4291 ));
4292 let schema = Arc::new(Schema::new(columns));
4293 let table_meta = TableMetaBuilder::empty()
4294 .schema(schema)
4295 .primary_key_indices((0..num_tag).collect())
4296 .next_column_id(1024)
4297 .build()
4298 .unwrap();
4299 let table_info = TableInfoBuilder::default()
4300 .name(table_name.clone())
4301 .meta(table_meta)
4302 .build()
4303 .unwrap();
4304 let table = EmptyTable::from_table_info(&table_info);
4305
4306 assert!(
4307 catalog_list
4308 .register_table_sync(RegisterTableRequest {
4309 catalog: DEFAULT_CATALOG_NAME.to_string(),
4310 schema: schema_name.clone(),
4311 table_name: table_name.clone(),
4312 table_id: 1024,
4313 table,
4314 })
4315 .is_ok()
4316 );
4317 }
4318
4319 DfTableSourceProvider::new(
4320 catalog_list,
4321 false,
4322 QueryContext::arc(),
4323 DummyDecoder::arc(),
4324 false,
4325 )
4326 }
4327
4328 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4344 let prom_expr =
4345 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4346 let eval_stmt = EvalStmt {
4347 expr: prom_expr,
4348 start: UNIX_EPOCH,
4349 end: UNIX_EPOCH
4350 .checked_add(Duration::from_secs(100_000))
4351 .unwrap(),
4352 interval: Duration::from_secs(5),
4353 lookback_delta: Duration::from_secs(1),
4354 };
4355
4356 let table_provider = build_test_table_provider(
4357 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4358 1,
4359 1,
4360 )
4361 .await;
4362 let plan =
4363 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4364 .await
4365 .unwrap();
4366
4367 let expected = String::from(
4368 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4369 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4370 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4371 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4372 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4373 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4374 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4375 ).replace("TEMPLATE", plan_name);
4376
4377 assert_eq!(plan.display_indent_schema().to_string(), expected);
4378 }
4379
4380 #[tokio::test]
4381 async fn single_abs() {
4382 do_single_instant_function_call("abs", "abs").await;
4383 }
4384
4385 #[tokio::test]
4386 #[should_panic]
4387 async fn single_absent() {
4388 do_single_instant_function_call("absent", "").await;
4389 }
4390
4391 #[tokio::test]
4392 async fn single_ceil() {
4393 do_single_instant_function_call("ceil", "ceil").await;
4394 }
4395
4396 #[tokio::test]
4397 async fn single_exp() {
4398 do_single_instant_function_call("exp", "exp").await;
4399 }
4400
4401 #[tokio::test]
4402 async fn single_ln() {
4403 do_single_instant_function_call("ln", "ln").await;
4404 }
4405
4406 #[tokio::test]
4407 async fn single_log2() {
4408 do_single_instant_function_call("log2", "log2").await;
4409 }
4410
4411 #[tokio::test]
4412 async fn single_log10() {
4413 do_single_instant_function_call("log10", "log10").await;
4414 }
4415
4416 #[tokio::test]
4417 #[should_panic]
4418 async fn single_scalar() {
4419 do_single_instant_function_call("scalar", "").await;
4420 }
4421
4422 #[tokio::test]
4423 #[should_panic]
4424 async fn single_sgn() {
4425 do_single_instant_function_call("sgn", "").await;
4426 }
4427
4428 #[tokio::test]
4429 #[should_panic]
4430 async fn single_sort() {
4431 do_single_instant_function_call("sort", "").await;
4432 }
4433
4434 #[tokio::test]
4435 #[should_panic]
4436 async fn single_sort_desc() {
4437 do_single_instant_function_call("sort_desc", "").await;
4438 }
4439
4440 #[tokio::test]
4441 async fn single_sqrt() {
4442 do_single_instant_function_call("sqrt", "sqrt").await;
4443 }
4444
4445 #[tokio::test]
4446 #[should_panic]
4447 async fn single_timestamp() {
4448 do_single_instant_function_call("timestamp", "").await;
4449 }
4450
4451 #[tokio::test]
4452 async fn single_acos() {
4453 do_single_instant_function_call("acos", "acos").await;
4454 }
4455
4456 #[tokio::test]
4457 #[should_panic]
4458 async fn single_acosh() {
4459 do_single_instant_function_call("acosh", "").await;
4460 }
4461
4462 #[tokio::test]
4463 async fn single_asin() {
4464 do_single_instant_function_call("asin", "asin").await;
4465 }
4466
4467 #[tokio::test]
4468 #[should_panic]
4469 async fn single_asinh() {
4470 do_single_instant_function_call("asinh", "").await;
4471 }
4472
4473 #[tokio::test]
4474 async fn single_atan() {
4475 do_single_instant_function_call("atan", "atan").await;
4476 }
4477
4478 #[tokio::test]
4479 #[should_panic]
4480 async fn single_atanh() {
4481 do_single_instant_function_call("atanh", "").await;
4482 }
4483
4484 #[tokio::test]
4485 async fn single_cos() {
4486 do_single_instant_function_call("cos", "cos").await;
4487 }
4488
4489 #[tokio::test]
4490 #[should_panic]
4491 async fn single_cosh() {
4492 do_single_instant_function_call("cosh", "").await;
4493 }
4494
4495 #[tokio::test]
4496 async fn single_sin() {
4497 do_single_instant_function_call("sin", "sin").await;
4498 }
4499
4500 #[tokio::test]
4501 #[should_panic]
4502 async fn single_sinh() {
4503 do_single_instant_function_call("sinh", "").await;
4504 }
4505
4506 #[tokio::test]
4507 async fn single_tan() {
4508 do_single_instant_function_call("tan", "tan").await;
4509 }
4510
4511 #[tokio::test]
4512 #[should_panic]
4513 async fn single_tanh() {
4514 do_single_instant_function_call("tanh", "").await;
4515 }
4516
4517 #[tokio::test]
4518 #[should_panic]
4519 async fn single_deg() {
4520 do_single_instant_function_call("deg", "").await;
4521 }
4522
4523 #[tokio::test]
4524 #[should_panic]
4525 async fn single_rad() {
4526 do_single_instant_function_call("rad", "").await;
4527 }
4528
4529 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4551 let prom_expr = parser::parse(&format!(
4552 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4553 ))
4554 .unwrap();
4555 let mut eval_stmt = EvalStmt {
4556 expr: prom_expr,
4557 start: UNIX_EPOCH,
4558 end: UNIX_EPOCH
4559 .checked_add(Duration::from_secs(100_000))
4560 .unwrap(),
4561 interval: Duration::from_secs(5),
4562 lookback_delta: Duration::from_secs(1),
4563 };
4564
4565 let table_provider = build_test_table_provider(
4567 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4568 2,
4569 2,
4570 )
4571 .await;
4572 let plan =
4573 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4574 .await
4575 .unwrap();
4576 let expected_no_without = String::from(
4577 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4578 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4579 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4580 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4581 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4582 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4583 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4584 ).replace("TEMPLATE", plan_name);
4585 assert_eq!(
4586 plan.display_indent_schema().to_string(),
4587 expected_no_without
4588 );
4589
4590 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4592 *modifier = Some(LabelModifier::Exclude(Labels {
4593 labels: vec![String::from("tag_1")].into_iter().collect(),
4594 }));
4595 }
4596 let table_provider = build_test_table_provider(
4597 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4598 2,
4599 2,
4600 )
4601 .await;
4602 let plan =
4603 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4604 .await
4605 .unwrap();
4606 let expected_without = String::from(
4607 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4608 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4609 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4610 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4611 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4612 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4613 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4614 ).replace("TEMPLATE", plan_name);
4615 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4616 }
4617
4618 #[tokio::test]
4619 async fn aggregate_sum() {
4620 do_aggregate_expr_plan("sum", "sum").await;
4621 }
4622
4623 #[tokio::test]
4624 async fn tsid_is_used_for_series_divide_when_available() {
4625 let prom_expr = parser::parse("some_metric").unwrap();
4626 let eval_stmt = EvalStmt {
4627 expr: prom_expr,
4628 start: UNIX_EPOCH,
4629 end: UNIX_EPOCH
4630 .checked_add(Duration::from_secs(100_000))
4631 .unwrap(),
4632 interval: Duration::from_secs(5),
4633 lookback_delta: Duration::from_secs(1),
4634 };
4635
4636 let table_provider = build_test_table_provider_with_tsid(
4637 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4638 1,
4639 1,
4640 )
4641 .await;
4642 let plan =
4643 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4644 .await
4645 .unwrap();
4646
4647 let plan_str = plan.display_indent_schema().to_string();
4648 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4649 assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4650 assert!(
4651 !plan
4652 .schema()
4653 .fields()
4654 .iter()
4655 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4656 );
4657 }
4658
4659 #[tokio::test]
4660 async fn physical_table_name_is_not_leaked_in_plan() {
4661 let prom_expr = parser::parse("some_metric").unwrap();
4662 let eval_stmt = EvalStmt {
4663 expr: prom_expr,
4664 start: UNIX_EPOCH,
4665 end: UNIX_EPOCH
4666 .checked_add(Duration::from_secs(100_000))
4667 .unwrap(),
4668 interval: Duration::from_secs(5),
4669 lookback_delta: Duration::from_secs(1),
4670 };
4671
4672 let table_provider = build_test_table_provider_with_tsid(
4673 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4674 1,
4675 1,
4676 )
4677 .await;
4678 let plan =
4679 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4680 .await
4681 .unwrap();
4682
4683 let plan_str = plan.display_indent_schema().to_string();
4684 assert!(plan_str.contains("TableScan: phy"), "{plan}");
4685 assert!(plan_str.contains("SubqueryAlias: some_metric"));
4686 assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4687 assert!(!plan_str.contains("TableScan: some_metric"));
4688 }
4689
4690 #[tokio::test]
4691 async fn sum_without_does_not_group_by_tsid() {
4692 let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4693 let eval_stmt = EvalStmt {
4694 expr: prom_expr,
4695 start: UNIX_EPOCH,
4696 end: UNIX_EPOCH
4697 .checked_add(Duration::from_secs(100_000))
4698 .unwrap(),
4699 interval: Duration::from_secs(5),
4700 lookback_delta: Duration::from_secs(1),
4701 };
4702
4703 let table_provider = build_test_table_provider_with_tsid(
4704 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4705 1,
4706 1,
4707 )
4708 .await;
4709 let plan =
4710 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4711 .await
4712 .unwrap();
4713
4714 let plan_str = plan.display_indent_schema().to_string();
4715 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4716
4717 let aggr_line = plan_str
4718 .lines()
4719 .find(|line| line.contains("Aggregate: groupBy="))
4720 .unwrap();
4721 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4722 }
4723
4724 #[tokio::test]
4725 async fn topk_without_does_not_partition_by_tsid() {
4726 let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4727 let eval_stmt = EvalStmt {
4728 expr: prom_expr,
4729 start: UNIX_EPOCH,
4730 end: UNIX_EPOCH
4731 .checked_add(Duration::from_secs(100_000))
4732 .unwrap(),
4733 interval: Duration::from_secs(5),
4734 lookback_delta: Duration::from_secs(1),
4735 };
4736
4737 let table_provider = build_test_table_provider_with_tsid(
4738 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4739 1,
4740 1,
4741 )
4742 .await;
4743 let plan =
4744 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4745 .await
4746 .unwrap();
4747
4748 let plan_str = plan.display_indent_schema().to_string();
4749 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4750
4751 let window_line = plan_str
4752 .lines()
4753 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4754 .unwrap();
4755 let partition_by = window_line
4756 .split("PARTITION BY [")
4757 .nth(1)
4758 .and_then(|s| s.split("] ORDER BY").next())
4759 .unwrap();
4760 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4761 }
4762
4763 #[tokio::test]
4764 async fn sum_by_does_not_group_by_tsid() {
4765 let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4766 let eval_stmt = EvalStmt {
4767 expr: prom_expr,
4768 start: UNIX_EPOCH,
4769 end: UNIX_EPOCH
4770 .checked_add(Duration::from_secs(100_000))
4771 .unwrap(),
4772 interval: Duration::from_secs(5),
4773 lookback_delta: Duration::from_secs(1),
4774 };
4775
4776 let table_provider = build_test_table_provider_with_tsid(
4777 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4778 1,
4779 1,
4780 )
4781 .await;
4782 let plan =
4783 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4784 .await
4785 .unwrap();
4786
4787 let plan_str = plan.display_indent_schema().to_string();
4788 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4789
4790 let aggr_line = plan_str
4791 .lines()
4792 .find(|line| line.contains("Aggregate: groupBy="))
4793 .unwrap();
4794 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4795 }
4796
4797 #[tokio::test]
4798 async fn topk_by_does_not_partition_by_tsid() {
4799 let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4800 let eval_stmt = EvalStmt {
4801 expr: prom_expr,
4802 start: UNIX_EPOCH,
4803 end: UNIX_EPOCH
4804 .checked_add(Duration::from_secs(100_000))
4805 .unwrap(),
4806 interval: Duration::from_secs(5),
4807 lookback_delta: Duration::from_secs(1),
4808 };
4809
4810 let table_provider = build_test_table_provider_with_tsid(
4811 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4812 1,
4813 1,
4814 )
4815 .await;
4816 let plan =
4817 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4818 .await
4819 .unwrap();
4820
4821 let plan_str = plan.display_indent_schema().to_string();
4822 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4823
4824 let window_line = plan_str
4825 .lines()
4826 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4827 .unwrap();
4828 let partition_by = window_line
4829 .split("PARTITION BY [")
4830 .nth(1)
4831 .and_then(|s| s.split("] ORDER BY").next())
4832 .unwrap();
4833 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4834 }
4835
4836 #[tokio::test]
4837 async fn selector_matcher_on_tsid_does_not_use_internal_column() {
4838 let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
4839 let eval_stmt = EvalStmt {
4840 expr: prom_expr,
4841 start: UNIX_EPOCH,
4842 end: UNIX_EPOCH
4843 .checked_add(Duration::from_secs(100_000))
4844 .unwrap(),
4845 interval: Duration::from_secs(5),
4846 lookback_delta: Duration::from_secs(1),
4847 };
4848
4849 let table_provider = build_test_table_provider_with_tsid(
4850 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4851 1,
4852 1,
4853 )
4854 .await;
4855 let plan =
4856 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4857 .await
4858 .unwrap();
4859
4860 fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
4861 if let LogicalPlan::Filter(filter) = plan {
4862 datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
4863 }
4864 for input in plan.inputs() {
4865 collect_filter_cols(input, out);
4866 }
4867 }
4868
4869 let mut filter_cols = HashSet::new();
4870 collect_filter_cols(&plan, &mut filter_cols);
4871 assert!(
4872 !filter_cols
4873 .iter()
4874 .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
4875 );
4876 }
4877
4878 #[tokio::test]
4879 async fn tsid_is_not_used_when_physical_table_is_missing() {
4880 let prom_expr = parser::parse("some_metric").unwrap();
4881 let eval_stmt = EvalStmt {
4882 expr: prom_expr,
4883 start: UNIX_EPOCH,
4884 end: UNIX_EPOCH
4885 .checked_add(Duration::from_secs(100_000))
4886 .unwrap(),
4887 interval: Duration::from_secs(5),
4888 lookback_delta: Duration::from_secs(1),
4889 };
4890
4891 let catalog_list = MemoryCatalogManager::with_default_setup();
4892
4893 let mut columns = vec![ColumnSchema::new(
4895 "tag_0".to_string(),
4896 ConcreteDataType::string_datatype(),
4897 false,
4898 )];
4899 columns.push(
4900 ColumnSchema::new(
4901 "timestamp".to_string(),
4902 ConcreteDataType::timestamp_millisecond_datatype(),
4903 false,
4904 )
4905 .with_time_index(true),
4906 );
4907 columns.push(ColumnSchema::new(
4908 "field_0".to_string(),
4909 ConcreteDataType::float64_datatype(),
4910 true,
4911 ));
4912 let schema = Arc::new(Schema::new(columns));
4913 let mut options = table::requests::TableOptions::default();
4914 options
4915 .extra_options
4916 .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
4917 let table_meta = TableMetaBuilder::empty()
4918 .schema(schema)
4919 .primary_key_indices(vec![0])
4920 .value_indices(vec![2])
4921 .engine(METRIC_ENGINE_NAME.to_string())
4922 .options(options)
4923 .next_column_id(1024)
4924 .build()
4925 .unwrap();
4926 let table_info = TableInfoBuilder::default()
4927 .table_id(1024)
4928 .name("some_metric")
4929 .meta(table_meta)
4930 .build()
4931 .unwrap();
4932 let table = EmptyTable::from_table_info(&table_info);
4933 catalog_list
4934 .register_table_sync(RegisterTableRequest {
4935 catalog: DEFAULT_CATALOG_NAME.to_string(),
4936 schema: DEFAULT_SCHEMA_NAME.to_string(),
4937 table_name: "some_metric".to_string(),
4938 table_id: 1024,
4939 table,
4940 })
4941 .unwrap();
4942
4943 let table_provider = DfTableSourceProvider::new(
4944 catalog_list,
4945 false,
4946 QueryContext::arc(),
4947 DummyDecoder::arc(),
4948 false,
4949 );
4950
4951 let plan =
4952 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4953 .await
4954 .unwrap();
4955
4956 let plan_str = plan.display_indent_schema().to_string();
4957 assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4958 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4959 }
4960
4961 #[tokio::test]
4962 async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
4963 let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
4964 let eval_stmt = EvalStmt {
4965 expr: prom_expr,
4966 start: UNIX_EPOCH,
4967 end: UNIX_EPOCH
4968 .checked_add(Duration::from_secs(100_000))
4969 .unwrap(),
4970 interval: Duration::from_secs(5),
4971 lookback_delta: Duration::from_secs(1),
4972 };
4973
4974 let table_provider = build_test_table_provider_with_tsid(
4975 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4976 1,
4977 1,
4978 )
4979 .await;
4980 let plan =
4981 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4982 .await
4983 .unwrap();
4984
4985 let plan_str = plan.display_indent_schema().to_string();
4986 assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
4987 assert!(
4988 !plan
4989 .schema()
4990 .fields()
4991 .iter()
4992 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4993 );
4994
4995 let prom_expr = parser::parse("sum(some_metric)").unwrap();
4997 let eval_stmt = EvalStmt {
4998 expr: prom_expr,
4999 start: UNIX_EPOCH,
5000 end: UNIX_EPOCH
5001 .checked_add(Duration::from_secs(100_000))
5002 .unwrap(),
5003 interval: Duration::from_secs(5),
5004 lookback_delta: Duration::from_secs(1),
5005 };
5006 let table_provider = build_test_table_provider_with_tsid(
5007 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5008 1,
5009 1,
5010 )
5011 .await;
5012 let plan =
5013 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5014 .await
5015 .unwrap();
5016 let plan_str = plan.display_indent_schema().to_string();
5017 assert!(!plan_str.contains("first_value"));
5018 }
5019
5020 #[tokio::test]
5021 async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5022 let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5023 let eval_stmt = EvalStmt {
5024 expr: prom_expr,
5025 start: UNIX_EPOCH,
5026 end: UNIX_EPOCH
5027 .checked_add(Duration::from_secs(100_000))
5028 .unwrap(),
5029 interval: Duration::from_secs(5),
5030 lookback_delta: Duration::from_secs(1),
5031 };
5032
5033 let table_provider = build_test_table_provider_with_tsid(
5034 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5035 1,
5036 1,
5037 )
5038 .await;
5039
5040 let plan =
5041 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5042 .await
5043 .unwrap();
5044
5045 assert!(
5046 !plan
5047 .schema()
5048 .fields()
5049 .iter()
5050 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5051 );
5052 }
5053
5054 #[tokio::test]
5055 async fn aggregate_avg() {
5056 do_aggregate_expr_plan("avg", "avg").await;
5057 }
5058
5059 #[tokio::test]
5060 #[should_panic] async fn aggregate_count() {
5062 do_aggregate_expr_plan("count", "count").await;
5063 }
5064
5065 #[tokio::test]
5066 async fn aggregate_min() {
5067 do_aggregate_expr_plan("min", "min").await;
5068 }
5069
5070 #[tokio::test]
5071 async fn aggregate_max() {
5072 do_aggregate_expr_plan("max", "max").await;
5073 }
5074
5075 #[tokio::test]
5076 async fn aggregate_group() {
5077 let prom_expr = parser::parse(
5081 "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5082 )
5083 .unwrap();
5084 let eval_stmt = EvalStmt {
5085 expr: prom_expr,
5086 start: UNIX_EPOCH,
5087 end: UNIX_EPOCH
5088 .checked_add(Duration::from_secs(100_000))
5089 .unwrap(),
5090 interval: Duration::from_secs(5),
5091 lookback_delta: Duration::from_secs(1),
5092 };
5093
5094 let table_provider = build_test_table_provider_with_fields(
5095 &[(
5096 DEFAULT_SCHEMA_NAME.to_string(),
5097 "kubernetes_build_info".to_string(),
5098 )],
5099 &["cluster", "service", "job"],
5100 )
5101 .await;
5102 let plan =
5103 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5104 .await
5105 .unwrap();
5106
5107 let plan_str = plan.display_indent_schema().to_string();
5108 assert!(plan_str.contains("max(Float64(1"));
5109 }
5110
5111 #[tokio::test]
5112 async fn aggregate_stddev() {
5113 do_aggregate_expr_plan("stddev", "stddev_pop").await;
5114 }
5115
5116 #[tokio::test]
5117 async fn aggregate_stdvar() {
5118 do_aggregate_expr_plan("stdvar", "var_pop").await;
5119 }
5120
5121 #[tokio::test]
5145 async fn binary_op_column_column() {
5146 let prom_expr =
5147 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5148 let eval_stmt = EvalStmt {
5149 expr: prom_expr,
5150 start: UNIX_EPOCH,
5151 end: UNIX_EPOCH
5152 .checked_add(Duration::from_secs(100_000))
5153 .unwrap(),
5154 interval: Duration::from_secs(5),
5155 lookback_delta: Duration::from_secs(1),
5156 };
5157
5158 let table_provider = build_test_table_provider(
5159 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5160 1,
5161 1,
5162 )
5163 .await;
5164 let plan =
5165 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5166 .await
5167 .unwrap();
5168
5169 let expected = String::from(
5170 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
5171 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5172 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5173 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5174 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5175 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5176 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5177 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5178 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5179 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5180 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5181 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5182 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5183 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5184 );
5185
5186 assert_eq!(plan.display_indent_schema().to_string(), expected);
5187 }
5188
5189 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5190 let prom_expr = parser::parse(query).unwrap();
5191 let eval_stmt = EvalStmt {
5192 expr: prom_expr,
5193 start: UNIX_EPOCH,
5194 end: UNIX_EPOCH
5195 .checked_add(Duration::from_secs(100_000))
5196 .unwrap(),
5197 interval: Duration::from_secs(5),
5198 lookback_delta: Duration::from_secs(1),
5199 };
5200
5201 let table_provider = build_test_table_provider(
5202 &[
5203 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5204 (
5205 "greptime_private".to_string(),
5206 "some_alt_metric".to_string(),
5207 ),
5208 ],
5209 1,
5210 1,
5211 )
5212 .await;
5213 let plan =
5214 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5215 .await
5216 .unwrap();
5217
5218 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5219 }
5220
5221 #[tokio::test]
5222 async fn binary_op_literal_column() {
5223 let query = r#"1 + some_metric{tag_0="bar"}"#;
5224 let expected = String::from(
5225 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
5226 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5227 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5228 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5229 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5230 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5231 );
5232
5233 indie_query_plan_compare(query, expected).await;
5234 }
5235
5236 #[tokio::test]
5237 async fn binary_op_literal_literal() {
5238 let query = r#"1 + 1"#;
5239 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5240 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5241 indie_query_plan_compare(query, expected).await;
5242 }
5243
5244 #[tokio::test]
5245 async fn simple_bool_grammar() {
5246 let query = "some_metric != bool 1.2345";
5247 let expected = String::from(
5248 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
5249 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5250 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5251 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5252 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5253 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5254 );
5255
5256 indie_query_plan_compare(query, expected).await;
5257 }
5258
5259 #[tokio::test]
5260 async fn bool_with_additional_arithmetic() {
5261 let query = "some_metric + (1 == bool 2)";
5262 let expected = String::from(
5263 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
5264 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5265 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5266 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5267 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5268 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5269 );
5270
5271 indie_query_plan_compare(query, expected).await;
5272 }
5273
5274 #[tokio::test]
5275 async fn simple_unary() {
5276 let query = "-some_metric";
5277 let expected = String::from(
5278 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
5279 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5280 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5281 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5282 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5283 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5284 );
5285
5286 indie_query_plan_compare(query, expected).await;
5287 }
5288
5289 #[tokio::test]
5290 async fn increase_aggr() {
5291 let query = "increase(some_metric[5m])";
5292 let expected = String::from(
5293 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5294 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5295 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5296 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5297 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5298 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5299 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5300 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5301 );
5302
5303 indie_query_plan_compare(query, expected).await;
5304 }
5305
5306 #[tokio::test]
5307 async fn less_filter_on_value() {
5308 let query = "some_metric < 1.2345";
5309 let expected = String::from(
5310 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5311 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5312 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5313 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5314 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5315 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5316 );
5317
5318 indie_query_plan_compare(query, expected).await;
5319 }
5320
5321 #[tokio::test]
5322 async fn count_over_time() {
5323 let query = "count_over_time(some_metric[5m])";
5324 let expected = String::from(
5325 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5326 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5327 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5328 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5329 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5330 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5331 \n Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5332 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5333 );
5334
5335 indie_query_plan_compare(query, expected).await;
5336 }
5337
5338 #[tokio::test]
5339 async fn test_hash_join() {
5340 let mut eval_stmt = EvalStmt {
5341 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5342 start: UNIX_EPOCH,
5343 end: UNIX_EPOCH
5344 .checked_add(Duration::from_secs(100_000))
5345 .unwrap(),
5346 interval: Duration::from_secs(5),
5347 lookback_delta: Duration::from_secs(1),
5348 };
5349
5350 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5351
5352 let prom_expr = parser::parse(case).unwrap();
5353 eval_stmt.expr = prom_expr;
5354 let table_provider = build_test_table_provider_with_fields(
5355 &[
5356 (
5357 DEFAULT_SCHEMA_NAME.to_string(),
5358 "http_server_requests_seconds_sum".to_string(),
5359 ),
5360 (
5361 DEFAULT_SCHEMA_NAME.to_string(),
5362 "http_server_requests_seconds_count".to_string(),
5363 ),
5364 ],
5365 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5366 )
5367 .await;
5368 let plan =
5370 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5371 .await
5372 .unwrap();
5373 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5374 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5375 \n SubqueryAlias: http_server_requests_seconds_sum\
5376 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5377 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5378 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5379 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5380 \n TableScan: http_server_requests_seconds_sum\
5381 \n SubqueryAlias: http_server_requests_seconds_count\
5382 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5383 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5384 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5385 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5386 \n TableScan: http_server_requests_seconds_count";
5387 assert_eq!(plan.to_string(), expected);
5388 }
5389
5390 #[tokio::test]
5391 async fn test_nested_histogram_quantile() {
5392 let mut eval_stmt = EvalStmt {
5393 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5394 start: UNIX_EPOCH,
5395 end: UNIX_EPOCH
5396 .checked_add(Duration::from_secs(100_000))
5397 .unwrap(),
5398 interval: Duration::from_secs(5),
5399 lookback_delta: Duration::from_secs(1),
5400 };
5401
5402 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5403
5404 let prom_expr = parser::parse(case).unwrap();
5405 eval_stmt.expr = prom_expr;
5406 let table_provider = build_test_table_provider_with_fields(
5407 &[(
5408 DEFAULT_SCHEMA_NAME.to_string(),
5409 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5410 )],
5411 &["pod", "le", "path", "code", "container"],
5412 )
5413 .await;
5414 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5416 .await
5417 .unwrap();
5418 }
5419
5420 #[tokio::test]
5421 async fn test_parse_and_operator() {
5422 let mut eval_stmt = EvalStmt {
5423 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5424 start: UNIX_EPOCH,
5425 end: UNIX_EPOCH
5426 .checked_add(Duration::from_secs(100_000))
5427 .unwrap(),
5428 interval: Duration::from_secs(5),
5429 lookback_delta: Duration::from_secs(1),
5430 };
5431
5432 let cases = [
5433 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5434 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5435 ];
5436
5437 for case in cases {
5438 let prom_expr = parser::parse(case).unwrap();
5439 eval_stmt.expr = prom_expr;
5440 let table_provider = build_test_table_provider_with_fields(
5441 &[
5442 (
5443 DEFAULT_SCHEMA_NAME.to_string(),
5444 "kubelet_volume_stats_used_bytes".to_string(),
5445 ),
5446 (
5447 DEFAULT_SCHEMA_NAME.to_string(),
5448 "kubelet_volume_stats_capacity_bytes".to_string(),
5449 ),
5450 ],
5451 &["namespace", "persistentvolumeclaim"],
5452 )
5453 .await;
5454 let _ =
5456 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5457 .await
5458 .unwrap();
5459 }
5460 }
5461
5462 #[tokio::test]
5463 async fn test_nested_binary_op() {
5464 let mut eval_stmt = EvalStmt {
5465 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5466 start: UNIX_EPOCH,
5467 end: UNIX_EPOCH
5468 .checked_add(Duration::from_secs(100_000))
5469 .unwrap(),
5470 interval: Duration::from_secs(5),
5471 lookback_delta: Duration::from_secs(1),
5472 };
5473
5474 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5475 (
5476 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5477 or
5478 vector(0)
5479 )"#;
5480
5481 let prom_expr = parser::parse(case).unwrap();
5482 eval_stmt.expr = prom_expr;
5483 let table_provider = build_test_table_provider_with_fields(
5484 &[(
5485 DEFAULT_SCHEMA_NAME.to_string(),
5486 "nginx_ingress_controller_requests".to_string(),
5487 )],
5488 &["namespace", "job"],
5489 )
5490 .await;
5491 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5493 .await
5494 .unwrap();
5495 }
5496
5497 #[tokio::test]
5498 async fn test_parse_or_operator() {
5499 let mut eval_stmt = EvalStmt {
5500 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5501 start: UNIX_EPOCH,
5502 end: UNIX_EPOCH
5503 .checked_add(Duration::from_secs(100_000))
5504 .unwrap(),
5505 interval: Duration::from_secs(5),
5506 lookback_delta: Duration::from_secs(1),
5507 };
5508
5509 let case = r#"
5510 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5511 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5512 or
5513 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5514 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5515
5516 let table_provider = build_test_table_provider_with_fields(
5517 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5518 &["tenant_name", "cluster_name"],
5519 )
5520 .await;
5521 eval_stmt.expr = parser::parse(case).unwrap();
5522 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5523 .await
5524 .unwrap();
5525
5526 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5527 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5528 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5529 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5530 or
5531 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5532 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5533 or
5534 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5535 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5536 let table_provider = build_test_table_provider_with_fields(
5537 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5538 &["tenant_name", "cluster_name"],
5539 )
5540 .await;
5541 eval_stmt.expr = parser::parse(case).unwrap();
5542 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5543 .await
5544 .unwrap();
5545
5546 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5547 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5548 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5549 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5550 let table_provider = build_test_table_provider_with_fields(
5551 &[
5552 (
5553 DEFAULT_SCHEMA_NAME.to_string(),
5554 "background_waitevent_cnt".to_string(),
5555 ),
5556 (
5557 DEFAULT_SCHEMA_NAME.to_string(),
5558 "foreground_waitevent_cnt".to_string(),
5559 ),
5560 ],
5561 &["tenant_name", "cluster_name"],
5562 )
5563 .await;
5564 eval_stmt.expr = parser::parse(case).unwrap();
5565 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5566 .await
5567 .unwrap();
5568
5569 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5570 let table_provider = build_test_table_provider_with_fields(
5571 &[
5572 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5573 (
5574 DEFAULT_SCHEMA_NAME.to_string(),
5575 "container_cpu_load_average_10s".to_string(),
5576 ),
5577 (
5578 DEFAULT_SCHEMA_NAME.to_string(),
5579 "container_spec_cpu_quota".to_string(),
5580 ),
5581 ],
5582 &["cluster_name", "host_name"],
5583 )
5584 .await;
5585 eval_stmt.expr = parser::parse(case).unwrap();
5586 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5587 .await
5588 .unwrap();
5589 }
5590
5591 #[tokio::test]
5592 async fn value_matcher() {
5593 let mut eval_stmt = EvalStmt {
5595 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5596 start: UNIX_EPOCH,
5597 end: UNIX_EPOCH
5598 .checked_add(Duration::from_secs(100_000))
5599 .unwrap(),
5600 interval: Duration::from_secs(5),
5601 lookback_delta: Duration::from_secs(1),
5602 };
5603
5604 let cases = [
5605 (
5607 r#"some_metric{__field__="field_1"}"#,
5608 vec![
5609 "some_metric.field_1",
5610 "some_metric.tag_0",
5611 "some_metric.tag_1",
5612 "some_metric.tag_2",
5613 "some_metric.timestamp",
5614 ],
5615 ),
5616 (
5618 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5619 vec![
5620 "some_metric.field_0",
5621 "some_metric.field_1",
5622 "some_metric.tag_0",
5623 "some_metric.tag_1",
5624 "some_metric.tag_2",
5625 "some_metric.timestamp",
5626 ],
5627 ),
5628 (
5630 r#"some_metric{__field__!="field_1"}"#,
5631 vec![
5632 "some_metric.field_0",
5633 "some_metric.field_2",
5634 "some_metric.tag_0",
5635 "some_metric.tag_1",
5636 "some_metric.tag_2",
5637 "some_metric.timestamp",
5638 ],
5639 ),
5640 (
5642 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5643 vec![
5644 "some_metric.field_0",
5645 "some_metric.tag_0",
5646 "some_metric.tag_1",
5647 "some_metric.tag_2",
5648 "some_metric.timestamp",
5649 ],
5650 ),
5651 (
5653 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5654 vec![
5655 "some_metric.field_1",
5656 "some_metric.tag_0",
5657 "some_metric.tag_1",
5658 "some_metric.tag_2",
5659 "some_metric.timestamp",
5660 ],
5661 ),
5662 (
5664 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5665 vec![
5666 "some_metric.tag_0",
5667 "some_metric.tag_1",
5668 "some_metric.tag_2",
5669 "some_metric.timestamp",
5670 ],
5671 ),
5672 (
5674 r#"some_metric{__field__=~"field_1|field_2"}"#,
5675 vec![
5676 "some_metric.field_1",
5677 "some_metric.field_2",
5678 "some_metric.tag_0",
5679 "some_metric.tag_1",
5680 "some_metric.tag_2",
5681 "some_metric.timestamp",
5682 ],
5683 ),
5684 (
5686 r#"some_metric{__field__!~"field_1|field_2"}"#,
5687 vec![
5688 "some_metric.field_0",
5689 "some_metric.tag_0",
5690 "some_metric.tag_1",
5691 "some_metric.tag_2",
5692 "some_metric.timestamp",
5693 ],
5694 ),
5695 ];
5696
5697 for case in cases {
5698 let prom_expr = parser::parse(case.0).unwrap();
5699 eval_stmt.expr = prom_expr;
5700 let table_provider = build_test_table_provider(
5701 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5702 3,
5703 3,
5704 )
5705 .await;
5706 let plan =
5707 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5708 .await
5709 .unwrap();
5710 let mut fields = plan.schema().field_names();
5711 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5712 fields.sort();
5713 expected.sort();
5714 assert_eq!(fields, expected, "case: {:?}", case.0);
5715 }
5716
5717 let bad_cases = [
5718 r#"some_metric{__field__="nonexistent"}"#,
5719 r#"some_metric{__field__!="nonexistent"}"#,
5720 ];
5721
5722 for case in bad_cases {
5723 let prom_expr = parser::parse(case).unwrap();
5724 eval_stmt.expr = prom_expr;
5725 let table_provider = build_test_table_provider(
5726 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5727 3,
5728 3,
5729 )
5730 .await;
5731 let plan =
5732 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5733 .await;
5734 assert!(plan.is_err(), "case: {:?}", case);
5735 }
5736 }
5737
5738 #[tokio::test]
5739 async fn custom_schema() {
5740 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5741 let expected = String::from(
5742 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5743 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5744 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5745 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5746 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5747 );
5748
5749 indie_query_plan_compare(query, expected).await;
5750
5751 let query = "some_alt_metric{__database__=\"greptime_private\"}";
5752 let expected = String::from(
5753 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5754 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5755 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5756 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5757 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5758 );
5759
5760 indie_query_plan_compare(query, expected).await;
5761
5762 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5763 let expected = String::from(
5764 "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5765 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5766 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5767 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5768 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5769 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5770 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5771 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5772 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5773 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5774 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5775 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5776 \n Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5777 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5778 );
5779
5780 indie_query_plan_compare(query, expected).await;
5781 }
5782
5783 #[tokio::test]
5784 async fn only_equals_is_supported_for_special_matcher() {
5785 let queries = &[
5786 "some_alt_metric{__schema__!=\"greptime_private\"}",
5787 "some_alt_metric{__schema__=~\"lalala\"}",
5788 "some_alt_metric{__database__!=\"greptime_private\"}",
5789 "some_alt_metric{__database__=~\"lalala\"}",
5790 ];
5791
5792 for query in queries {
5793 let prom_expr = parser::parse(query).unwrap();
5794 let eval_stmt = EvalStmt {
5795 expr: prom_expr,
5796 start: UNIX_EPOCH,
5797 end: UNIX_EPOCH
5798 .checked_add(Duration::from_secs(100_000))
5799 .unwrap(),
5800 interval: Duration::from_secs(5),
5801 lookback_delta: Duration::from_secs(1),
5802 };
5803
5804 let table_provider = build_test_table_provider(
5805 &[
5806 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5807 (
5808 "greptime_private".to_string(),
5809 "some_alt_metric".to_string(),
5810 ),
5811 ],
5812 1,
5813 1,
5814 )
5815 .await;
5816
5817 let plan =
5818 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5819 .await;
5820 assert!(plan.is_err(), "query: {:?}", query);
5821 }
5822 }
5823
5824 #[tokio::test]
5825 async fn test_non_ms_precision() {
5826 let catalog_list = MemoryCatalogManager::with_default_setup();
5827 let columns = vec![
5828 ColumnSchema::new(
5829 "tag".to_string(),
5830 ConcreteDataType::string_datatype(),
5831 false,
5832 ),
5833 ColumnSchema::new(
5834 "timestamp".to_string(),
5835 ConcreteDataType::timestamp_nanosecond_datatype(),
5836 false,
5837 )
5838 .with_time_index(true),
5839 ColumnSchema::new(
5840 "field".to_string(),
5841 ConcreteDataType::float64_datatype(),
5842 true,
5843 ),
5844 ];
5845 let schema = Arc::new(Schema::new(columns));
5846 let table_meta = TableMetaBuilder::empty()
5847 .schema(schema)
5848 .primary_key_indices(vec![0])
5849 .value_indices(vec![2])
5850 .next_column_id(1024)
5851 .build()
5852 .unwrap();
5853 let table_info = TableInfoBuilder::default()
5854 .name("metrics".to_string())
5855 .meta(table_meta)
5856 .build()
5857 .unwrap();
5858 let table = EmptyTable::from_table_info(&table_info);
5859 assert!(
5860 catalog_list
5861 .register_table_sync(RegisterTableRequest {
5862 catalog: DEFAULT_CATALOG_NAME.to_string(),
5863 schema: DEFAULT_SCHEMA_NAME.to_string(),
5864 table_name: "metrics".to_string(),
5865 table_id: 1024,
5866 table,
5867 })
5868 .is_ok()
5869 );
5870
5871 let plan = PromPlanner::stmt_to_plan(
5872 DfTableSourceProvider::new(
5873 catalog_list.clone(),
5874 false,
5875 QueryContext::arc(),
5876 DummyDecoder::arc(),
5877 true,
5878 ),
5879 &EvalStmt {
5880 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
5881 start: UNIX_EPOCH,
5882 end: UNIX_EPOCH
5883 .checked_add(Duration::from_secs(100_000))
5884 .unwrap(),
5885 interval: Duration::from_secs(5),
5886 lookback_delta: Duration::from_secs(1),
5887 },
5888 &build_query_engine_state(),
5889 )
5890 .await
5891 .unwrap();
5892 assert_eq!(
5893 plan.display_indent_schema().to_string(),
5894 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5895 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5896 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5897 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5898 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5899 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5900 );
5901 let plan = PromPlanner::stmt_to_plan(
5902 DfTableSourceProvider::new(
5903 catalog_list.clone(),
5904 false,
5905 QueryContext::arc(),
5906 DummyDecoder::arc(),
5907 true,
5908 ),
5909 &EvalStmt {
5910 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
5911 start: UNIX_EPOCH,
5912 end: UNIX_EPOCH
5913 .checked_add(Duration::from_secs(100_000))
5914 .unwrap(),
5915 interval: Duration::from_secs(5),
5916 lookback_delta: Duration::from_secs(1),
5917 },
5918 &build_query_engine_state(),
5919 )
5920 .await
5921 .unwrap();
5922 assert_eq!(
5923 plan.display_indent_schema().to_string(),
5924 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5925 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5926 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5927 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5928 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5929 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5930 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5931 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5932 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5933 );
5934 }
5935
5936 #[tokio::test]
5937 async fn test_nonexistent_label() {
5938 let mut eval_stmt = EvalStmt {
5940 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5941 start: UNIX_EPOCH,
5942 end: UNIX_EPOCH
5943 .checked_add(Duration::from_secs(100_000))
5944 .unwrap(),
5945 interval: Duration::from_secs(5),
5946 lookback_delta: Duration::from_secs(1),
5947 };
5948
5949 let case = r#"some_metric{nonexistent="hi"}"#;
5950 let prom_expr = parser::parse(case).unwrap();
5951 eval_stmt.expr = prom_expr;
5952 let table_provider = build_test_table_provider(
5953 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5954 3,
5955 3,
5956 )
5957 .await;
5958 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5960 .await
5961 .unwrap();
5962 }
5963
5964 #[tokio::test]
5965 async fn test_label_join() {
5966 let prom_expr = parser::parse(
5967 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
5968 )
5969 .unwrap();
5970 let eval_stmt = EvalStmt {
5971 expr: prom_expr,
5972 start: UNIX_EPOCH,
5973 end: UNIX_EPOCH
5974 .checked_add(Duration::from_secs(100_000))
5975 .unwrap(),
5976 interval: Duration::from_secs(5),
5977 lookback_delta: Duration::from_secs(1),
5978 };
5979
5980 let table_provider =
5981 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
5982 .await;
5983 let plan =
5984 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5985 .await
5986 .unwrap();
5987
5988 let expected = r#"
5989Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5990 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5991 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5992 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5993 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5994 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5995 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5996
5997 let ret = plan.display_indent_schema().to_string();
5998 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5999 }
6000
6001 #[tokio::test]
6002 async fn test_label_replace() {
6003 let prom_expr = parser::parse(
6004 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6005 )
6006 .unwrap();
6007 let eval_stmt = EvalStmt {
6008 expr: prom_expr,
6009 start: UNIX_EPOCH,
6010 end: UNIX_EPOCH
6011 .checked_add(Duration::from_secs(100_000))
6012 .unwrap(),
6013 interval: Duration::from_secs(5),
6014 lookback_delta: Duration::from_secs(1),
6015 };
6016
6017 let table_provider =
6018 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6019 .await;
6020 let plan =
6021 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6022 .await
6023 .unwrap();
6024
6025 let expected = r#"
6026Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6027 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6028 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6029 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6030 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6031 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6032 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
6033
6034 let ret = plan.display_indent_schema().to_string();
6035 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6036 }
6037
6038 #[tokio::test]
6039 async fn test_matchers_to_expr() {
6040 let mut eval_stmt = EvalStmt {
6041 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6042 start: UNIX_EPOCH,
6043 end: UNIX_EPOCH
6044 .checked_add(Duration::from_secs(100_000))
6045 .unwrap(),
6046 interval: Duration::from_secs(5),
6047 lookback_delta: Duration::from_secs(1),
6048 };
6049 let case =
6050 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6051
6052 let prom_expr = parser::parse(case).unwrap();
6053 eval_stmt.expr = prom_expr;
6054 let table_provider = build_test_table_provider(
6055 &[(
6056 DEFAULT_SCHEMA_NAME.to_string(),
6057 "prometheus_tsdb_head_series".to_string(),
6058 )],
6059 3,
6060 3,
6061 )
6062 .await;
6063 let plan =
6064 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6065 .await
6066 .unwrap();
6067 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6068 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6069 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6070 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6071 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6072 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6073 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6074 assert_eq!(plan.display_indent_schema().to_string(), expected);
6075 }
6076
6077 #[tokio::test]
6078 async fn test_topk_expr() {
6079 let mut eval_stmt = EvalStmt {
6080 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6081 start: UNIX_EPOCH,
6082 end: UNIX_EPOCH
6083 .checked_add(Duration::from_secs(100_000))
6084 .unwrap(),
6085 interval: Duration::from_secs(5),
6086 lookback_delta: Duration::from_secs(1),
6087 };
6088 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6089
6090 let prom_expr = parser::parse(case).unwrap();
6091 eval_stmt.expr = prom_expr;
6092 let table_provider = build_test_table_provider_with_fields(
6093 &[
6094 (
6095 DEFAULT_SCHEMA_NAME.to_string(),
6096 "prometheus_tsdb_head_series".to_string(),
6097 ),
6098 (
6099 DEFAULT_SCHEMA_NAME.to_string(),
6100 "http_server_requests_seconds_count".to_string(),
6101 ),
6102 ],
6103 &["ip"],
6104 )
6105 .await;
6106
6107 let plan =
6108 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6109 .await
6110 .unwrap();
6111 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
6112 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6113 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6114 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6115 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6116 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6117 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6118 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6119 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6120 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6121 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6122
6123 assert_eq!(plan.display_indent_schema().to_string(), expected);
6124 }
6125
6126 #[tokio::test]
6127 async fn test_count_values_expr() {
6128 let mut eval_stmt = EvalStmt {
6129 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6130 start: UNIX_EPOCH,
6131 end: UNIX_EPOCH
6132 .checked_add(Duration::from_secs(100_000))
6133 .unwrap(),
6134 interval: Duration::from_secs(5),
6135 lookback_delta: Duration::from_secs(1),
6136 };
6137 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6138
6139 let prom_expr = parser::parse(case).unwrap();
6140 eval_stmt.expr = prom_expr;
6141 let table_provider = build_test_table_provider_with_fields(
6142 &[
6143 (
6144 DEFAULT_SCHEMA_NAME.to_string(),
6145 "prometheus_tsdb_head_series".to_string(),
6146 ),
6147 (
6148 DEFAULT_SCHEMA_NAME.to_string(),
6149 "http_server_requests_seconds_count".to_string(),
6150 ),
6151 ],
6152 &["ip"],
6153 )
6154 .await;
6155
6156 let plan =
6157 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6158 .await
6159 .unwrap();
6160 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
6161 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6162 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6163 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6164 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6165 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6166 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6167 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6168 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6169
6170 assert_eq!(plan.display_indent_schema().to_string(), expected);
6171 }
6172
6173 #[tokio::test]
6174 async fn test_value_alias() {
6175 let mut eval_stmt = EvalStmt {
6176 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6177 start: UNIX_EPOCH,
6178 end: UNIX_EPOCH
6179 .checked_add(Duration::from_secs(100_000))
6180 .unwrap(),
6181 interval: Duration::from_secs(5),
6182 lookback_delta: Duration::from_secs(1),
6183 };
6184 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6185
6186 let prom_expr = parser::parse(case).unwrap();
6187 eval_stmt.expr = prom_expr;
6188 eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6189 let table_provider = build_test_table_provider_with_fields(
6190 &[
6191 (
6192 DEFAULT_SCHEMA_NAME.to_string(),
6193 "prometheus_tsdb_head_series".to_string(),
6194 ),
6195 (
6196 DEFAULT_SCHEMA_NAME.to_string(),
6197 "http_server_requests_seconds_count".to_string(),
6198 ),
6199 ],
6200 &["ip"],
6201 )
6202 .await;
6203
6204 let plan =
6205 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6206 .await
6207 .unwrap();
6208 let expected = r#"
6209Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
6210 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
6211 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6212 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6213 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6214 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6215 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6216 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6217 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6218 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
6219 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6220 }
6221
6222 #[tokio::test]
6223 async fn test_quantile_expr() {
6224 let mut eval_stmt = EvalStmt {
6225 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6226 start: UNIX_EPOCH,
6227 end: UNIX_EPOCH
6228 .checked_add(Duration::from_secs(100_000))
6229 .unwrap(),
6230 interval: Duration::from_secs(5),
6231 lookback_delta: Duration::from_secs(1),
6232 };
6233 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6234
6235 let prom_expr = parser::parse(case).unwrap();
6236 eval_stmt.expr = prom_expr;
6237 let table_provider = build_test_table_provider_with_fields(
6238 &[
6239 (
6240 DEFAULT_SCHEMA_NAME.to_string(),
6241 "prometheus_tsdb_head_series".to_string(),
6242 ),
6243 (
6244 DEFAULT_SCHEMA_NAME.to_string(),
6245 "http_server_requests_seconds_count".to_string(),
6246 ),
6247 ],
6248 &["ip"],
6249 )
6250 .await;
6251
6252 let plan =
6253 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6254 .await
6255 .unwrap();
6256 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6257 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6258 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6259 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6260 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6261 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6262 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6263 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6264 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6265
6266 assert_eq!(plan.display_indent_schema().to_string(), expected);
6267 }
6268
6269 #[tokio::test]
6270 async fn test_or_not_exists_table_label() {
6271 let mut eval_stmt = EvalStmt {
6272 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6273 start: UNIX_EPOCH,
6274 end: UNIX_EPOCH
6275 .checked_add(Duration::from_secs(100_000))
6276 .unwrap(),
6277 interval: Duration::from_secs(5),
6278 lookback_delta: Duration::from_secs(1),
6279 };
6280 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6281
6282 let prom_expr = parser::parse(case).unwrap();
6283 eval_stmt.expr = prom_expr;
6284 let table_provider = build_test_table_provider_with_fields(
6285 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6286 &["job"],
6287 )
6288 .await;
6289
6290 let plan =
6291 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6292 .await
6293 .unwrap();
6294 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6295 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6296 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6297 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6298 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6299 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6300 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6301 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6302 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6303 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6304 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6305 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6306 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6307 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6308 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
6309 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
6310
6311 assert_eq!(plan.display_indent_schema().to_string(), expected);
6312 }
6313
6314 #[tokio::test]
6315 async fn test_histogram_quantile_missing_le_column() {
6316 let mut eval_stmt = EvalStmt {
6317 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6318 start: UNIX_EPOCH,
6319 end: UNIX_EPOCH
6320 .checked_add(Duration::from_secs(100_000))
6321 .unwrap(),
6322 interval: Duration::from_secs(5),
6323 lookback_delta: Duration::from_secs(1),
6324 };
6325
6326 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6328
6329 let prom_expr = parser::parse(case).unwrap();
6330 eval_stmt.expr = prom_expr;
6331
6332 let table_provider = build_test_table_provider_with_fields(
6334 &[(
6335 DEFAULT_SCHEMA_NAME.to_string(),
6336 "non_existent_histogram_bucket".to_string(),
6337 )],
6338 &["pod", "instance"], )
6340 .await;
6341
6342 let result =
6344 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6345 .await;
6346
6347 assert!(
6349 result.is_ok(),
6350 "Expected successful plan creation with empty result, but got error: {:?}",
6351 result.err()
6352 );
6353
6354 let plan = result.unwrap();
6356 match plan {
6357 LogicalPlan::EmptyRelation(_) => {
6358 }
6360 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6361 }
6362 }
6363}