query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61    Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63    quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71    VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88    ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92/// `time()` function in PromQL.
93const SPECIAL_TIME_FUNCTION: &str = "time";
94/// `scalar()` function in PromQL.
95const SCALAR_FUNCTION: &str = "scalar";
96/// `absent()` function in PromQL
97const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98/// `histogram_quantile` function in PromQL
99const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100/// `vector` function in PromQL
101const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102/// `le` column for conventional histogram.
103const LE_COLUMN_NAME: &str = "le";
104
105/// Static regex for validating label names according to Prometheus specification.
106/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
107static LABEL_NAME_REGEX: Lazy<Regex> =
108    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112/// default value column name for empty metric
113const DEFAULT_FIELD_COLUMN: &str = "value";
114
115/// Special modifier to project field columns under multi-field mode
116const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118/// Special modifier for cross schema query
119const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122/// Threshold for scatter scan mode
123const MAX_SCATTER_POINTS: i64 = 400;
124
125/// Interval 1 hour in millisecond
126const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130    // query parameters
131    start: Millisecond,
132    end: Millisecond,
133    interval: Millisecond,
134    lookback_delta: Millisecond,
135
136    // planner states
137    table_name: Option<String>,
138    time_index_column: Option<String>,
139    field_columns: Vec<String>,
140    tag_columns: Vec<String>,
141    /// The matcher for field columns `__field__`.
142    field_column_matcher: Option<Vec<Matcher>>,
143    /// The matcher for selectors (normal matchers).
144    selector_matcher: Vec<Matcher>,
145    schema_name: Option<String>,
146    /// The range in millisecond of range selector. None if there is no range selector.
147    range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152        Self {
153            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155            interval: stmt.interval.as_millis() as _,
156            lookback_delta: stmt.lookback_delta.as_millis() as _,
157            ..Default::default()
158        }
159    }
160
161    /// Reset all planner states
162    fn reset(&mut self) {
163        self.table_name = None;
164        self.time_index_column = None;
165        self.field_columns = vec![];
166        self.tag_columns = vec![];
167        self.field_column_matcher = None;
168        self.selector_matcher.clear();
169        self.schema_name = None;
170        self.range = None;
171    }
172
173    /// Reset table name and schema to empty
174    fn reset_table_name_and_schema(&mut self) {
175        self.table_name = Some(String::new());
176        self.schema_name = None;
177    }
178
179    /// Check if `le` is present in tag columns
180    fn has_le_tag(&self) -> bool {
181        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182    }
183}
184
185pub struct PromPlanner {
186    table_provider: DfTableSourceProvider,
187    ctx: PromPlannerContext,
188}
189
190/// Unescapes the value of the matcher
191pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
192    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
193        matcher.value = unescaped_value;
194    }
195    matcher
196}
197
198impl PromPlanner {
199    pub async fn stmt_to_plan_with_alias(
200        table_provider: DfTableSourceProvider,
201        stmt: &EvalStmt,
202        alias: Option<String>,
203        query_engine_state: &QueryEngineState,
204    ) -> Result<LogicalPlan> {
205        let mut planner = Self {
206            table_provider,
207            ctx: PromPlannerContext::from_eval_stmt(stmt),
208        };
209
210        let plan = planner
211            .prom_expr_to_plan(&stmt.expr, query_engine_state)
212            .await?;
213
214        // Apply alias if provided
215        if let Some(alias_name) = alias {
216            planner.apply_alias_projection(plan, alias_name)
217        } else {
218            Ok(plan)
219        }
220    }
221
222    #[cfg(test)]
223    async fn stmt_to_plan(
224        table_provider: DfTableSourceProvider,
225        stmt: &EvalStmt,
226        query_engine_state: &QueryEngineState,
227    ) -> Result<LogicalPlan> {
228        Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
229    }
230
231    pub async fn prom_expr_to_plan(
232        &mut self,
233        prom_expr: &PromExpr,
234        query_engine_state: &QueryEngineState,
235    ) -> Result<LogicalPlan> {
236        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
237            .await
238    }
239
240    /**
241    Converts a PromQL expression to a logical plan.
242
243    NOTE:
244        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
245        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
246        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
247        If `false`, the planner generates the standard logical plan for the given PromQL expression.
248    */
249    #[async_recursion]
250    async fn prom_expr_to_plan_inner(
251        &mut self,
252        prom_expr: &PromExpr,
253        timestamp_fn: bool,
254        query_engine_state: &QueryEngineState,
255    ) -> Result<LogicalPlan> {
256        let res = match prom_expr {
257            PromExpr::Aggregate(expr) => {
258                self.prom_aggr_expr_to_plan(query_engine_state, expr)
259                    .await?
260            }
261            PromExpr::Unary(expr) => {
262                self.prom_unary_expr_to_plan(query_engine_state, expr)
263                    .await?
264            }
265            PromExpr::Binary(expr) => {
266                self.prom_binary_expr_to_plan(query_engine_state, expr)
267                    .await?
268            }
269            PromExpr::Paren(ParenExpr { expr }) => {
270                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
271                    .await?
272            }
273            PromExpr::Subquery(expr) => {
274                self.prom_subquery_expr_to_plan(query_engine_state, expr)
275                    .await?
276            }
277            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
278            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
279            PromExpr::VectorSelector(selector) => {
280                self.prom_vector_selector_to_plan(selector, timestamp_fn)
281                    .await?
282            }
283            PromExpr::MatrixSelector(selector) => {
284                self.prom_matrix_selector_to_plan(selector).await?
285            }
286            PromExpr::Call(expr) => {
287                self.prom_call_expr_to_plan(query_engine_state, expr)
288                    .await?
289            }
290            PromExpr::Extension(expr) => {
291                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
292            }
293        };
294
295        Ok(res)
296    }
297
298    async fn prom_subquery_expr_to_plan(
299        &mut self,
300        query_engine_state: &QueryEngineState,
301        subquery_expr: &SubqueryExpr,
302    ) -> Result<LogicalPlan> {
303        let SubqueryExpr {
304            expr, range, step, ..
305        } = subquery_expr;
306
307        let current_interval = self.ctx.interval;
308        if let Some(step) = step {
309            self.ctx.interval = step.as_millis() as _;
310        }
311        let current_start = self.ctx.start;
312        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
313        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
314        self.ctx.interval = current_interval;
315        self.ctx.start = current_start;
316
317        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
318        let range_ms = range.as_millis() as _;
319        self.ctx.range = Some(range_ms);
320
321        let manipulate = RangeManipulate::new(
322            self.ctx.start,
323            self.ctx.end,
324            self.ctx.interval,
325            range_ms,
326            self.ctx
327                .time_index_column
328                .clone()
329                .expect("time index should be set in `setup_context`"),
330            self.ctx.field_columns.clone(),
331            input,
332        )
333        .context(DataFusionPlanningSnafu)?;
334
335        Ok(LogicalPlan::Extension(Extension {
336            node: Arc::new(manipulate),
337        }))
338    }
339
340    async fn prom_aggr_expr_to_plan(
341        &mut self,
342        query_engine_state: &QueryEngineState,
343        aggr_expr: &AggregateExpr,
344    ) -> Result<LogicalPlan> {
345        let AggregateExpr {
346            op,
347            expr,
348            modifier,
349            param,
350        } = aggr_expr;
351
352        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
353
354        match (*op).id() {
355            token::T_TOPK | token::T_BOTTOMK => {
356                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
357            }
358            _ => {
359                // calculate columns to group by
360                // Need to append time index column into group by columns
361                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
362                // convert op and value columns to aggregate exprs
363                let (aggr_exprs, prev_field_exprs) =
364                    self.create_aggregate_exprs(*op, param, &input)?;
365
366                // create plan
367                let builder = LogicalPlanBuilder::from(input);
368                let builder = if op.id() == token::T_COUNT_VALUES {
369                    let label = Self::get_param_value_as_str(*op, param)?;
370                    // `count_values` must be grouped by fields,
371                    // and project the fields to the new label.
372                    group_exprs.extend(prev_field_exprs.clone());
373                    let project_fields = self
374                        .create_field_column_exprs()?
375                        .into_iter()
376                        .chain(self.create_tag_column_exprs()?)
377                        .chain(Some(self.create_time_index_column_expr()?))
378                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
379
380                    builder
381                        .aggregate(group_exprs.clone(), aggr_exprs)
382                        .context(DataFusionPlanningSnafu)?
383                        .project(project_fields)
384                        .context(DataFusionPlanningSnafu)?
385                } else {
386                    builder
387                        .aggregate(group_exprs.clone(), aggr_exprs)
388                        .context(DataFusionPlanningSnafu)?
389                };
390
391                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
392
393                builder
394                    .sort(sort_expr)
395                    .context(DataFusionPlanningSnafu)?
396                    .build()
397                    .context(DataFusionPlanningSnafu)
398            }
399        }
400    }
401
402    /// Create logical plan for PromQL topk and bottomk expr.
403    async fn prom_topk_bottomk_to_plan(
404        &mut self,
405        aggr_expr: &AggregateExpr,
406        input: LogicalPlan,
407    ) -> Result<LogicalPlan> {
408        let AggregateExpr {
409            op,
410            param,
411            modifier,
412            ..
413        } = aggr_expr;
414
415        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
416
417        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
418
419        // convert op and value columns to window exprs.
420        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
421
422        let rank_columns: Vec<_> = window_exprs
423            .iter()
424            .map(|expr| expr.schema_name().to_string())
425            .collect();
426
427        // Create ranks filter with `Operator::Or`.
428        // Safety: at least one rank column
429        let filter: DfExpr = rank_columns
430            .iter()
431            .fold(None, |expr, rank| {
432                let predicate = DfExpr::BinaryExpr(BinaryExpr {
433                    left: Box::new(col(rank)),
434                    op: Operator::LtEq,
435                    right: Box::new(val.clone()),
436                });
437
438                match expr {
439                    None => Some(predicate),
440                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
441                        left: Box::new(expr),
442                        op: Operator::Or,
443                        right: Box::new(predicate),
444                    })),
445                }
446            })
447            .unwrap();
448
449        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
450
451        let mut new_group_exprs = group_exprs.clone();
452        // Order by ranks
453        new_group_exprs.extend(rank_columns);
454
455        let group_sort_expr = new_group_exprs
456            .into_iter()
457            .map(|expr| expr.sort(true, false));
458
459        let project_fields = self
460            .create_field_column_exprs()?
461            .into_iter()
462            .chain(self.create_tag_column_exprs()?)
463            .chain(Some(self.create_time_index_column_expr()?));
464
465        LogicalPlanBuilder::from(input)
466            .window(window_exprs)
467            .context(DataFusionPlanningSnafu)?
468            .filter(filter)
469            .context(DataFusionPlanningSnafu)?
470            .sort(group_sort_expr)
471            .context(DataFusionPlanningSnafu)?
472            .project(project_fields)
473            .context(DataFusionPlanningSnafu)?
474            .build()
475            .context(DataFusionPlanningSnafu)
476    }
477
478    async fn prom_unary_expr_to_plan(
479        &mut self,
480        query_engine_state: &QueryEngineState,
481        unary_expr: &UnaryExpr,
482    ) -> Result<LogicalPlan> {
483        let UnaryExpr { expr } = unary_expr;
484        // Unary Expr in PromQL implys the `-` operator
485        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
486        self.projection_for_each_field_column(input, |col| {
487            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
488        })
489    }
490
491    async fn prom_binary_expr_to_plan(
492        &mut self,
493        query_engine_state: &QueryEngineState,
494        binary_expr: &PromBinaryExpr,
495    ) -> Result<LogicalPlan> {
496        let PromBinaryExpr {
497            lhs,
498            rhs,
499            op,
500            modifier,
501        } = binary_expr;
502
503        // if set to true, comparison operator will return 0/1 (for true/false) instead of
504        // filter on the result column
505        let should_return_bool = if let Some(m) = modifier {
506            m.return_bool
507        } else {
508            false
509        };
510        let is_comparison_op = Self::is_token_a_comparison_op(*op);
511
512        // we should build a filter plan here if the op is comparison op and need not
513        // to return 0/1. Otherwise, we should build a projection plan
514        match (
515            Self::try_build_literal_expr(lhs),
516            Self::try_build_literal_expr(rhs),
517        ) {
518            (Some(lhs), Some(rhs)) => {
519                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
520                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
521                self.ctx.reset_table_name_and_schema();
522                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
523                let mut field_expr = field_expr_builder(lhs, rhs)?;
524
525                if is_comparison_op && should_return_bool {
526                    field_expr = DfExpr::Cast(Cast {
527                        expr: Box::new(field_expr),
528                        data_type: ArrowDataType::Float64,
529                    });
530                }
531
532                Ok(LogicalPlan::Extension(Extension {
533                    node: Arc::new(
534                        EmptyMetric::new(
535                            self.ctx.start,
536                            self.ctx.end,
537                            self.ctx.interval,
538                            SPECIAL_TIME_FUNCTION.to_string(),
539                            DEFAULT_FIELD_COLUMN.to_string(),
540                            Some(field_expr),
541                        )
542                        .context(DataFusionPlanningSnafu)?,
543                    ),
544                }))
545            }
546            // lhs is a literal, rhs is a column
547            (Some(mut expr), None) => {
548                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
549                // check if the literal is a special time expr
550                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
551                    expr = time_expr
552                }
553                let bin_expr_builder = |col: &String| {
554                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
555                    let mut binary_expr =
556                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
557
558                    if is_comparison_op && should_return_bool {
559                        binary_expr = DfExpr::Cast(Cast {
560                            expr: Box::new(binary_expr),
561                            data_type: ArrowDataType::Float64,
562                        });
563                    }
564                    Ok(binary_expr)
565                };
566                if is_comparison_op && !should_return_bool {
567                    self.filter_on_field_column(input, bin_expr_builder)
568                } else {
569                    self.projection_for_each_field_column(input, bin_expr_builder)
570                }
571            }
572            // lhs is a column, rhs is a literal
573            (None, Some(mut expr)) => {
574                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
575                // check if the literal is a special time expr
576                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
577                    expr = time_expr
578                }
579                let bin_expr_builder = |col: &String| {
580                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
581                    let mut binary_expr =
582                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
583
584                    if is_comparison_op && should_return_bool {
585                        binary_expr = DfExpr::Cast(Cast {
586                            expr: Box::new(binary_expr),
587                            data_type: ArrowDataType::Float64,
588                        });
589                    }
590                    Ok(binary_expr)
591                };
592                if is_comparison_op && !should_return_bool {
593                    self.filter_on_field_column(input, bin_expr_builder)
594                } else {
595                    self.projection_for_each_field_column(input, bin_expr_builder)
596                }
597            }
598            // both are columns. join them on time index
599            (None, None) => {
600                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
601                let left_field_columns = self.ctx.field_columns.clone();
602                let left_time_index_column = self.ctx.time_index_column.clone();
603                let mut left_table_ref = self
604                    .table_ref()
605                    .unwrap_or_else(|_| TableReference::bare(""));
606                let left_context = self.ctx.clone();
607
608                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
609                let right_field_columns = self.ctx.field_columns.clone();
610                let right_time_index_column = self.ctx.time_index_column.clone();
611                let mut right_table_ref = self
612                    .table_ref()
613                    .unwrap_or_else(|_| TableReference::bare(""));
614                let right_context = self.ctx.clone();
615
616                // TODO(ruihang): avoid join if left and right are the same table
617
618                // set op has "special" join semantics
619                if Self::is_token_a_set_op(*op) {
620                    return self.set_op_on_non_field_columns(
621                        left_input,
622                        right_input,
623                        left_context,
624                        right_context,
625                        *op,
626                        modifier,
627                    );
628                }
629
630                // normal join
631                if left_table_ref == right_table_ref {
632                    // rename table references to avoid ambiguity
633                    left_table_ref = TableReference::bare("lhs");
634                    right_table_ref = TableReference::bare("rhs");
635                    // `self.ctx` have ctx in right plan, if right plan have no tag,
636                    // we use left plan ctx as the ctx for subsequent calculations,
637                    // to avoid case like `host + scalar(...)`
638                    // we need preserve tag column on `host` table in subsequent projection,
639                    // which only show in left plan ctx.
640                    if self.ctx.tag_columns.is_empty() {
641                        self.ctx = left_context.clone();
642                        self.ctx.table_name = Some("lhs".to_string());
643                    } else {
644                        self.ctx.table_name = Some("rhs".to_string());
645                    }
646                }
647                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
648
649                let join_plan = self.join_on_non_field_columns(
650                    left_input,
651                    right_input,
652                    left_table_ref.clone(),
653                    right_table_ref.clone(),
654                    left_time_index_column,
655                    right_time_index_column,
656                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
657                    // under this case we only join on time index
658                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
659                    modifier,
660                )?;
661                let join_plan_schema = join_plan.schema().clone();
662
663                let bin_expr_builder = |_: &String| {
664                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
665                    let left_col = join_plan_schema
666                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
667                        .context(DataFusionPlanningSnafu)?
668                        .into();
669                    let right_col = join_plan_schema
670                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
671                        .context(DataFusionPlanningSnafu)?
672                        .into();
673
674                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
675                    let mut binary_expr =
676                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
677                    if is_comparison_op && should_return_bool {
678                        binary_expr = DfExpr::Cast(Cast {
679                            expr: Box::new(binary_expr),
680                            data_type: ArrowDataType::Float64,
681                        });
682                    }
683                    Ok(binary_expr)
684                };
685                if is_comparison_op && !should_return_bool {
686                    self.filter_on_field_column(join_plan, bin_expr_builder)
687                } else {
688                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
689                }
690            }
691        }
692    }
693
694    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
695        let NumberLiteral { val } = number_literal;
696        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
697        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
698        self.ctx.reset_table_name_and_schema();
699        let literal_expr = df_prelude::lit(*val);
700
701        let plan = LogicalPlan::Extension(Extension {
702            node: Arc::new(
703                EmptyMetric::new(
704                    self.ctx.start,
705                    self.ctx.end,
706                    self.ctx.interval,
707                    SPECIAL_TIME_FUNCTION.to_string(),
708                    DEFAULT_FIELD_COLUMN.to_string(),
709                    Some(literal_expr),
710                )
711                .context(DataFusionPlanningSnafu)?,
712            ),
713        });
714        Ok(plan)
715    }
716
717    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
718        let StringLiteral { val } = string_literal;
719        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
720        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
721        self.ctx.reset_table_name_and_schema();
722        let literal_expr = df_prelude::lit(val.clone());
723
724        let plan = LogicalPlan::Extension(Extension {
725            node: Arc::new(
726                EmptyMetric::new(
727                    self.ctx.start,
728                    self.ctx.end,
729                    self.ctx.interval,
730                    SPECIAL_TIME_FUNCTION.to_string(),
731                    DEFAULT_FIELD_COLUMN.to_string(),
732                    Some(literal_expr),
733                )
734                .context(DataFusionPlanningSnafu)?,
735            ),
736        });
737        Ok(plan)
738    }
739
740    async fn prom_vector_selector_to_plan(
741        &mut self,
742        vector_selector: &VectorSelector,
743        timestamp_fn: bool,
744    ) -> Result<LogicalPlan> {
745        let VectorSelector {
746            name,
747            offset,
748            matchers,
749            at: _,
750        } = vector_selector;
751        let matchers = self.preprocess_label_matchers(matchers, name)?;
752        if let Some(empty_plan) = self.setup_context().await? {
753            return Ok(empty_plan);
754        }
755        let normalize = self
756            .selector_to_series_normalize_plan(offset, matchers, false)
757            .await?;
758
759        let normalize = if timestamp_fn {
760            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
761            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
762            self.create_timestamp_func_plan(normalize)?
763        } else {
764            normalize
765        };
766
767        let manipulate = InstantManipulate::new(
768            self.ctx.start,
769            self.ctx.end,
770            self.ctx.lookback_delta,
771            self.ctx.interval,
772            self.ctx
773                .time_index_column
774                .clone()
775                .expect("time index should be set in `setup_context`"),
776            self.ctx.field_columns.first().cloned(),
777            normalize,
778        );
779        Ok(LogicalPlan::Extension(Extension {
780            node: Arc::new(manipulate),
781        }))
782    }
783
784    /// Builds a projection plan for the PromQL `timestamp()` function.
785    /// Projects the time index column as the value column for each row.
786    ///
787    /// # Arguments
788    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
789    ///
790    /// # Returns
791    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
792    /// column as the value column, along with the original tag and time index columns.
793    ///
794    /// # Timestamp vs. Time Function
795    ///
796    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
797    ///   timestamp (time index) of each sample as the value column.
798    ///
799    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
800    ///   as a scalar value.
801    ///
802    /// # Side Effects
803    /// Updates the planner context's field columns to the timestamp column name.
804    ///
805    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
806        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
807            .alias(DEFAULT_FIELD_COLUMN);
808        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
809        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
810        project_exprs.push(self.create_time_index_column_expr()?);
811        project_exprs.push(time_expr);
812        project_exprs.extend(self.create_tag_column_exprs()?);
813
814        LogicalPlanBuilder::from(normalize)
815            .project(project_exprs)
816            .context(DataFusionPlanningSnafu)?
817            .build()
818            .context(DataFusionPlanningSnafu)
819    }
820
821    async fn prom_matrix_selector_to_plan(
822        &mut self,
823        matrix_selector: &MatrixSelector,
824    ) -> Result<LogicalPlan> {
825        let MatrixSelector { vs, range } = matrix_selector;
826        let VectorSelector {
827            name,
828            offset,
829            matchers,
830            ..
831        } = vs;
832        let matchers = self.preprocess_label_matchers(matchers, name)?;
833        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
834        let range_ms = range.as_millis() as _;
835        self.ctx.range = Some(range_ms);
836
837        // Some functions like rate may require special fields in the RangeManipulate plan
838        // so we can't skip RangeManipulate.
839        let normalize = match self.setup_context().await? {
840            Some(empty_plan) => empty_plan,
841            None => {
842                self.selector_to_series_normalize_plan(offset, matchers, true)
843                    .await?
844            }
845        };
846        let manipulate = RangeManipulate::new(
847            self.ctx.start,
848            self.ctx.end,
849            self.ctx.interval,
850            // TODO(ruihang): convert via Timestamp datatypes to support different time units
851            range_ms,
852            self.ctx
853                .time_index_column
854                .clone()
855                .expect("time index should be set in `setup_context`"),
856            self.ctx.field_columns.clone(),
857            normalize,
858        )
859        .context(DataFusionPlanningSnafu)?;
860
861        Ok(LogicalPlan::Extension(Extension {
862            node: Arc::new(manipulate),
863        }))
864    }
865
866    async fn prom_call_expr_to_plan(
867        &mut self,
868        query_engine_state: &QueryEngineState,
869        call_expr: &Call,
870    ) -> Result<LogicalPlan> {
871        let Call { func, args } = call_expr;
872        // some special functions that are not expression but a plan
873        match func.name {
874            SPECIAL_HISTOGRAM_QUANTILE => {
875                return self.create_histogram_plan(args, query_engine_state).await;
876            }
877            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
878            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
879            SPECIAL_ABSENT_FUNCTION => {
880                return self.create_absent_plan(args, query_engine_state).await;
881            }
882            _ => {}
883        }
884
885        // transform function arguments
886        let args = self.create_function_args(&args.args)?;
887        let input = if let Some(prom_expr) = &args.input {
888            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
889                .await?
890        } else {
891            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
892            self.ctx.reset_table_name_and_schema();
893            self.ctx.tag_columns = vec![];
894            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
895            LogicalPlan::Extension(Extension {
896                node: Arc::new(
897                    EmptyMetric::new(
898                        self.ctx.start,
899                        self.ctx.end,
900                        self.ctx.interval,
901                        SPECIAL_TIME_FUNCTION.to_string(),
902                        DEFAULT_FIELD_COLUMN.to_string(),
903                        None,
904                    )
905                    .context(DataFusionPlanningSnafu)?,
906                ),
907            })
908        };
909        let (mut func_exprs, new_tags) =
910            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
911        func_exprs.insert(0, self.create_time_index_column_expr()?);
912        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
913
914        let builder = LogicalPlanBuilder::from(input)
915            .project(func_exprs)
916            .context(DataFusionPlanningSnafu)?
917            .filter(self.create_empty_values_filter_expr()?)
918            .context(DataFusionPlanningSnafu)?;
919
920        let builder = match func.name {
921            "sort" => builder
922                .sort(self.create_field_columns_sort_exprs(true))
923                .context(DataFusionPlanningSnafu)?,
924            "sort_desc" => builder
925                .sort(self.create_field_columns_sort_exprs(false))
926                .context(DataFusionPlanningSnafu)?,
927            "sort_by_label" => builder
928                .sort(Self::create_sort_exprs_by_tags(
929                    func.name,
930                    args.literals,
931                    true,
932                )?)
933                .context(DataFusionPlanningSnafu)?,
934            "sort_by_label_desc" => builder
935                .sort(Self::create_sort_exprs_by_tags(
936                    func.name,
937                    args.literals,
938                    false,
939                )?)
940                .context(DataFusionPlanningSnafu)?,
941
942            _ => builder,
943        };
944
945        // Update context tags after building plan
946        // We can't push them before planning, because they won't exist until projection.
947        for tag in new_tags {
948            self.ctx.tag_columns.push(tag);
949        }
950
951        let plan = builder.build().context(DataFusionPlanningSnafu)?;
952        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
953
954        Ok(plan)
955    }
956
957    async fn prom_ext_expr_to_plan(
958        &mut self,
959        query_engine_state: &QueryEngineState,
960        ext_expr: &promql_parser::parser::ast::Extension,
961    ) -> Result<LogicalPlan> {
962        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
963        let expr = &ext_expr.expr;
964        let children = expr.children();
965        let plan = self
966            .prom_expr_to_plan(&children[0], query_engine_state)
967            .await?;
968        // Wrapper for the explanation/analyze of the existing plan
969        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
970        // if `analyze` is true, runs the actual plan and produces
971        // information about metrics during run.
972        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
973        match expr.name() {
974            "ANALYZE" => LogicalPlanBuilder::from(plan)
975                .explain(false, true)
976                .unwrap()
977                .build()
978                .context(DataFusionPlanningSnafu),
979            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
980                .explain(true, true)
981                .unwrap()
982                .build()
983                .context(DataFusionPlanningSnafu),
984            "EXPLAIN" => LogicalPlanBuilder::from(plan)
985                .explain(false, false)
986                .unwrap()
987                .build()
988                .context(DataFusionPlanningSnafu),
989            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
990                .explain(true, false)
991                .unwrap()
992                .build()
993                .context(DataFusionPlanningSnafu),
994            _ => LogicalPlanBuilder::empty(true)
995                .build()
996                .context(DataFusionPlanningSnafu),
997        }
998    }
999
1000    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
1001    /// Returns a new [Matchers] that doesn't contain metric name matcher.
1002    ///
1003    /// Each call to this function means new selector is started. Thus, the context will be reset
1004    /// at first.
1005    ///
1006    /// Name rule:
1007    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1008    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1009    #[allow(clippy::mutable_key_type)]
1010    fn preprocess_label_matchers(
1011        &mut self,
1012        label_matchers: &Matchers,
1013        name: &Option<String>,
1014    ) -> Result<Matchers> {
1015        self.ctx.reset();
1016
1017        let metric_name;
1018        if let Some(name) = name.clone() {
1019            metric_name = Some(name);
1020            ensure!(
1021                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1022                MultipleMetricMatchersSnafu
1023            );
1024        } else {
1025            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1026            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1027            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1028            ensure!(
1029                matches[0].op == MatchOp::Equal,
1030                UnsupportedMatcherOpSnafu {
1031                    matcher_op: matches[0].op.to_string(),
1032                    matcher: METRIC_NAME
1033                }
1034            );
1035            metric_name = matches.pop().map(|m| m.value);
1036        }
1037
1038        self.ctx.table_name = metric_name;
1039
1040        let mut matchers = HashSet::new();
1041        for matcher in &label_matchers.matchers {
1042            // TODO(ruihang): support other metric match ops
1043            if matcher.name == FIELD_COLUMN_MATCHER {
1044                self.ctx
1045                    .field_column_matcher
1046                    .get_or_insert_default()
1047                    .push(matcher.clone());
1048            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1049                ensure!(
1050                    matcher.op == MatchOp::Equal,
1051                    UnsupportedMatcherOpSnafu {
1052                        matcher: matcher.name.clone(),
1053                        matcher_op: matcher.op.to_string(),
1054                    }
1055                );
1056                self.ctx.schema_name = Some(matcher.value.clone());
1057            } else if matcher.name != METRIC_NAME {
1058                self.ctx.selector_matcher.push(matcher.clone());
1059                let _ = matchers.insert(matcher.clone());
1060            }
1061        }
1062
1063        Ok(Matchers::new(
1064            matchers.into_iter().map(normalize_matcher).collect(),
1065        ))
1066    }
1067
1068    async fn selector_to_series_normalize_plan(
1069        &mut self,
1070        offset: &Option<Offset>,
1071        label_matchers: Matchers,
1072        is_range_selector: bool,
1073    ) -> Result<LogicalPlan> {
1074        // make table scan plan
1075        let table_ref = self.table_ref()?;
1076        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1077        let table_schema = table_scan.schema();
1078
1079        // make filter exprs
1080        let offset_duration = match offset {
1081            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1082            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1083            None => 0,
1084        };
1085        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1086        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1087            scan_filters.push(time_index_filter);
1088        }
1089        table_scan = LogicalPlanBuilder::from(table_scan)
1090            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1091            .context(DataFusionPlanningSnafu)?
1092            .build()
1093            .context(DataFusionPlanningSnafu)?;
1094
1095        // make a projection plan if there is any `__field__` matcher
1096        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1097            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1098            // opt-in set
1099            let mut result_set = HashSet::new();
1100            // opt-out set
1101            let mut reverse_set = HashSet::new();
1102            for matcher in field_matchers {
1103                match &matcher.op {
1104                    MatchOp::Equal => {
1105                        if col_set.contains(&matcher.value) {
1106                            let _ = result_set.insert(matcher.value.clone());
1107                        } else {
1108                            return Err(ColumnNotFoundSnafu {
1109                                col: matcher.value.clone(),
1110                            }
1111                            .build());
1112                        }
1113                    }
1114                    MatchOp::NotEqual => {
1115                        if col_set.contains(&matcher.value) {
1116                            let _ = reverse_set.insert(matcher.value.clone());
1117                        } else {
1118                            return Err(ColumnNotFoundSnafu {
1119                                col: matcher.value.clone(),
1120                            }
1121                            .build());
1122                        }
1123                    }
1124                    MatchOp::Re(regex) => {
1125                        for col in &self.ctx.field_columns {
1126                            if regex.is_match(col) {
1127                                let _ = result_set.insert(col.clone());
1128                            }
1129                        }
1130                    }
1131                    MatchOp::NotRe(regex) => {
1132                        for col in &self.ctx.field_columns {
1133                            if regex.is_match(col) {
1134                                let _ = reverse_set.insert(col.clone());
1135                            }
1136                        }
1137                    }
1138                }
1139            }
1140            // merge two set
1141            if result_set.is_empty() {
1142                result_set = col_set.into_iter().cloned().collect();
1143            }
1144            for col in reverse_set {
1145                let _ = result_set.remove(&col);
1146            }
1147
1148            // mask the field columns in context using computed result set
1149            self.ctx.field_columns = self
1150                .ctx
1151                .field_columns
1152                .drain(..)
1153                .filter(|col| result_set.contains(col))
1154                .collect();
1155
1156            let exprs = result_set
1157                .into_iter()
1158                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1159                .chain(self.create_tag_column_exprs()?)
1160                .chain(Some(self.create_time_index_column_expr()?))
1161                .collect::<Vec<_>>();
1162
1163            // reuse this variable for simplicity
1164            table_scan = LogicalPlanBuilder::from(table_scan)
1165                .project(exprs)
1166                .context(DataFusionPlanningSnafu)?
1167                .build()
1168                .context(DataFusionPlanningSnafu)?;
1169        }
1170
1171        // make sort plan
1172        let sort_plan = LogicalPlanBuilder::from(table_scan)
1173            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1174            .context(DataFusionPlanningSnafu)?
1175            .build()
1176            .context(DataFusionPlanningSnafu)?;
1177
1178        // make divide plan
1179        let time_index_column =
1180            self.ctx
1181                .time_index_column
1182                .clone()
1183                .with_context(|| TimeIndexNotFoundSnafu {
1184                    table: table_ref.to_string(),
1185                })?;
1186        let divide_plan = LogicalPlan::Extension(Extension {
1187            node: Arc::new(SeriesDivide::new(
1188                self.ctx.tag_columns.clone(),
1189                time_index_column,
1190                sort_plan,
1191            )),
1192        });
1193
1194        // make series_normalize plan
1195        if !is_range_selector && offset_duration == 0 {
1196            return Ok(divide_plan);
1197        }
1198        let series_normalize = SeriesNormalize::new(
1199            offset_duration,
1200            self.ctx
1201                .time_index_column
1202                .clone()
1203                .with_context(|| TimeIndexNotFoundSnafu {
1204                    table: table_ref.to_quoted_string(),
1205                })?,
1206            is_range_selector,
1207            self.ctx.tag_columns.clone(),
1208            divide_plan,
1209        );
1210        let logical_plan = LogicalPlan::Extension(Extension {
1211            node: Arc::new(series_normalize),
1212        });
1213
1214        Ok(logical_plan)
1215    }
1216
1217    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1218    /// Timestamp column and tag columns will be included.
1219    ///
1220    /// # Side effect
1221    ///
1222    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1223    fn agg_modifier_to_col(
1224        &mut self,
1225        input_schema: &DFSchemaRef,
1226        modifier: &Option<LabelModifier>,
1227        update_ctx: bool,
1228    ) -> Result<Vec<DfExpr>> {
1229        match modifier {
1230            None => {
1231                if update_ctx {
1232                    self.ctx.tag_columns.clear();
1233                }
1234                Ok(vec![self.create_time_index_column_expr()?])
1235            }
1236            Some(LabelModifier::Include(labels)) => {
1237                if update_ctx {
1238                    self.ctx.tag_columns.clear();
1239                }
1240                let mut exprs = Vec::with_capacity(labels.labels.len());
1241                for label in &labels.labels {
1242                    // nonexistence label will be ignored
1243                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1244                    {
1245                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1246
1247                        if update_ctx {
1248                            // update the tag columns in context
1249                            self.ctx.tag_columns.push(column_name);
1250                        }
1251                    }
1252                }
1253                // add timestamp column
1254                exprs.push(self.create_time_index_column_expr()?);
1255
1256                Ok(exprs)
1257            }
1258            Some(LabelModifier::Exclude(labels)) => {
1259                let mut all_fields = input_schema
1260                    .fields()
1261                    .iter()
1262                    .map(|f| f.name())
1263                    .collect::<BTreeSet<_>>();
1264
1265                // remove "without"-ed fields
1266                // nonexistence label will be ignored
1267                for label in &labels.labels {
1268                    let _ = all_fields.remove(label);
1269                }
1270
1271                // remove time index and value fields
1272                if let Some(time_index) = &self.ctx.time_index_column {
1273                    let _ = all_fields.remove(time_index);
1274                }
1275                for value in &self.ctx.field_columns {
1276                    let _ = all_fields.remove(value);
1277                }
1278
1279                if update_ctx {
1280                    // change the tag columns in context
1281                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1282                }
1283
1284                // collect remaining fields and convert to col expr
1285                let mut exprs = all_fields
1286                    .into_iter()
1287                    .map(|c| DfExpr::Column(Column::from(c)))
1288                    .collect::<Vec<_>>();
1289
1290                // add timestamp column
1291                exprs.push(self.create_time_index_column_expr()?);
1292
1293                Ok(exprs)
1294            }
1295        }
1296    }
1297
1298    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1299    pub fn matchers_to_expr(
1300        label_matchers: Matchers,
1301        table_schema: &DFSchemaRef,
1302    ) -> Result<Vec<DfExpr>> {
1303        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1304        for matcher in label_matchers.matchers {
1305            if matcher.name == SCHEMA_COLUMN_MATCHER
1306                || matcher.name == DB_COLUMN_MATCHER
1307                || matcher.name == FIELD_COLUMN_MATCHER
1308            {
1309                continue;
1310            }
1311
1312            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1313            let col = if let Some(column_name) = column_name {
1314                DfExpr::Column(Column::from_name(column_name))
1315            } else {
1316                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1317                    .alias(matcher.name.clone())
1318            };
1319            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1320            let expr = match matcher.op {
1321                MatchOp::Equal => col.eq(lit),
1322                MatchOp::NotEqual => col.not_eq(lit),
1323                MatchOp::Re(re) => {
1324                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1325
1326                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1327                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1328                    // most of the time it's fine.
1329                    if re.as_str() == "^(?:.*)$" {
1330                        continue;
1331                    }
1332                    if re.as_str() == "^(?:.+)$" {
1333                        col.not_eq(DfExpr::Literal(
1334                            ScalarValue::Utf8(Some(String::new())),
1335                            None,
1336                        ))
1337                    } else {
1338                        DfExpr::BinaryExpr(BinaryExpr {
1339                            left: Box::new(col),
1340                            op: Operator::RegexMatch,
1341                            right: Box::new(DfExpr::Literal(
1342                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1343                                None,
1344                            )),
1345                        })
1346                    }
1347                }
1348                MatchOp::NotRe(re) => {
1349                    if re.as_str() == "^(?:.*)$" {
1350                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1351                    } else if re.as_str() == "^(?:.+)$" {
1352                        col.eq(DfExpr::Literal(
1353                            ScalarValue::Utf8(Some(String::new())),
1354                            None,
1355                        ))
1356                    } else {
1357                        DfExpr::BinaryExpr(BinaryExpr {
1358                            left: Box::new(col),
1359                            op: Operator::RegexNotMatch,
1360                            right: Box::new(DfExpr::Literal(
1361                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1362                                None,
1363                            )),
1364                        })
1365                    }
1366                }
1367            };
1368            exprs.push(expr);
1369        }
1370
1371        Ok(exprs)
1372    }
1373
1374    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1375        schema
1376            .fields()
1377            .iter()
1378            .find(|field| field.name() == column)
1379            .map(|field| field.name().clone())
1380    }
1381
1382    fn table_ref(&self) -> Result<TableReference> {
1383        let table_name = self
1384            .ctx
1385            .table_name
1386            .clone()
1387            .context(TableNameNotFoundSnafu)?;
1388
1389        // set schema name if `__schema__` is given
1390        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1391            TableReference::partial(schema_name.as_str(), table_name.as_str())
1392        } else {
1393            TableReference::bare(table_name.as_str())
1394        };
1395
1396        Ok(table_ref)
1397    }
1398
1399    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1400        let start = self.ctx.start;
1401        let end = self.ctx.end;
1402        if end < start {
1403            return InvalidTimeRangeSnafu { start, end }.fail();
1404        }
1405        let lookback_delta = self.ctx.lookback_delta;
1406        let range = self.ctx.range.unwrap_or_default();
1407        let interval = self.ctx.interval;
1408        let time_index_expr = self.create_time_index_column_expr()?;
1409        let num_points = (end - start) / interval;
1410
1411        // Scan a continuous time range
1412        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1413            let single_time_range = time_index_expr
1414                .clone()
1415                .gt_eq(DfExpr::Literal(
1416                    ScalarValue::TimestampMillisecond(
1417                        Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1418                        None,
1419                    ),
1420                    None,
1421                ))
1422                .and(time_index_expr.lt_eq(DfExpr::Literal(
1423                    ScalarValue::TimestampMillisecond(
1424                        Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1425                        None,
1426                    ),
1427                    None,
1428                )));
1429            return Ok(Some(single_time_range));
1430        }
1431
1432        // Otherwise scan scatter ranges separately
1433        let mut filters = Vec::with_capacity(num_points as usize);
1434        for timestamp in (start..end).step_by(interval as usize) {
1435            filters.push(
1436                time_index_expr
1437                    .clone()
1438                    .gt_eq(DfExpr::Literal(
1439                        ScalarValue::TimestampMillisecond(
1440                            Some(timestamp + offset_duration - lookback_delta - range),
1441                            None,
1442                        ),
1443                        None,
1444                    ))
1445                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1446                        ScalarValue::TimestampMillisecond(
1447                            Some(timestamp + offset_duration + lookback_delta),
1448                            None,
1449                        ),
1450                        None,
1451                    ))),
1452            )
1453        }
1454
1455        Ok(filters.into_iter().reduce(DfExpr::or))
1456    }
1457
1458    /// Create a table scan plan and a filter plan with given filter.
1459    ///
1460    /// # Panic
1461    /// If the filter is empty
1462    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1463        let provider = self
1464            .table_provider
1465            .resolve_table(table_ref.clone())
1466            .await
1467            .context(CatalogSnafu)?;
1468
1469        let is_time_index_ms = provider
1470            .as_any()
1471            .downcast_ref::<DefaultTableSource>()
1472            .context(UnknownTableSnafu)?
1473            .table_provider
1474            .as_any()
1475            .downcast_ref::<DfTableProviderAdapter>()
1476            .context(UnknownTableSnafu)?
1477            .table()
1478            .schema()
1479            .timestamp_column()
1480            .with_context(|| TimeIndexNotFoundSnafu {
1481                table: table_ref.to_quoted_string(),
1482            })?
1483            .data_type
1484            == ConcreteDataType::timestamp_millisecond_datatype();
1485
1486        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1487            .context(DataFusionPlanningSnafu)?
1488            .build()
1489            .context(DataFusionPlanningSnafu)?;
1490
1491        if !is_time_index_ms {
1492            // cast to ms if time_index not in Millisecond precision
1493            let expr: Vec<_> = self
1494                .ctx
1495                .field_columns
1496                .iter()
1497                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1498                .chain(self.create_tag_column_exprs()?)
1499                .chain(Some(DfExpr::Alias(Alias {
1500                    expr: Box::new(DfExpr::Cast(Cast {
1501                        expr: Box::new(self.create_time_index_column_expr()?),
1502                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1503                    })),
1504                    relation: Some(table_ref.clone()),
1505                    name: self
1506                        .ctx
1507                        .time_index_column
1508                        .as_ref()
1509                        .with_context(|| TimeIndexNotFoundSnafu {
1510                            table: table_ref.to_quoted_string(),
1511                        })?
1512                        .clone(),
1513                    metadata: None,
1514                })))
1515                .collect::<Vec<_>>();
1516            scan_plan = LogicalPlanBuilder::from(scan_plan)
1517                .project(expr)
1518                .context(DataFusionPlanningSnafu)?
1519                .build()
1520                .context(DataFusionPlanningSnafu)?;
1521        }
1522
1523        let result = LogicalPlanBuilder::from(scan_plan)
1524            .build()
1525            .context(DataFusionPlanningSnafu)?;
1526        Ok(result)
1527    }
1528
1529    /// Setup [PromPlannerContext]'s state fields.
1530    ///
1531    /// Returns a logical plan for an empty metric.
1532    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1533        let table_ref = self.table_ref()?;
1534        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1535            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1536                let plan = self.setup_context_for_empty_metric()?;
1537                return Ok(Some(plan));
1538            }
1539            res => res.context(CatalogSnafu)?,
1540        };
1541        let table = table
1542            .as_any()
1543            .downcast_ref::<DefaultTableSource>()
1544            .context(UnknownTableSnafu)?
1545            .table_provider
1546            .as_any()
1547            .downcast_ref::<DfTableProviderAdapter>()
1548            .context(UnknownTableSnafu)?
1549            .table();
1550
1551        // set time index column name
1552        let time_index = table
1553            .schema()
1554            .timestamp_column()
1555            .with_context(|| TimeIndexNotFoundSnafu {
1556                table: table_ref.to_quoted_string(),
1557            })?
1558            .name
1559            .clone();
1560        self.ctx.time_index_column = Some(time_index);
1561
1562        // set values columns
1563        let values = table
1564            .table_info()
1565            .meta
1566            .field_column_names()
1567            .cloned()
1568            .collect();
1569        self.ctx.field_columns = values;
1570
1571        // set primary key (tag) columns
1572        let tags = table
1573            .table_info()
1574            .meta
1575            .row_key_column_names()
1576            .filter(|col| {
1577                // remove metric engine's internal columns
1578                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1579            })
1580            .cloned()
1581            .collect();
1582        self.ctx.tag_columns = tags;
1583
1584        Ok(None)
1585    }
1586
1587    /// Setup [PromPlannerContext]'s state fields for a non existent table
1588    /// without any rows.
1589    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1590        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1591        self.ctx.reset_table_name_and_schema();
1592        self.ctx.tag_columns = vec![];
1593        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1594
1595        // The table doesn't have any data, so we set start to 0 and end to -1.
1596        let plan = LogicalPlan::Extension(Extension {
1597            node: Arc::new(
1598                EmptyMetric::new(
1599                    0,
1600                    -1,
1601                    self.ctx.interval,
1602                    SPECIAL_TIME_FUNCTION.to_string(),
1603                    DEFAULT_FIELD_COLUMN.to_string(),
1604                    Some(lit(0.0f64)),
1605                )
1606                .context(DataFusionPlanningSnafu)?,
1607            ),
1608        });
1609        Ok(plan)
1610    }
1611
1612    // TODO(ruihang): insert column expr
1613    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1614        let mut result = FunctionArgs::default();
1615
1616        for arg in args {
1617            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
1618            if let Some(expr) = Self::try_build_literal_expr(arg) {
1619                result.literals.push(expr);
1620            } else {
1621                // If not a literal, treat as vector input
1622                match arg.as_ref() {
1623                    PromExpr::Subquery(_)
1624                    | PromExpr::VectorSelector(_)
1625                    | PromExpr::MatrixSelector(_)
1626                    | PromExpr::Extension(_)
1627                    | PromExpr::Aggregate(_)
1628                    | PromExpr::Paren(_)
1629                    | PromExpr::Call(_)
1630                    | PromExpr::Binary(_)
1631                    | PromExpr::Unary(_) => {
1632                        if result.input.replace(*arg.clone()).is_some() {
1633                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1634                        }
1635                    }
1636
1637                    _ => {
1638                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1639                        result.literals.push(expr);
1640                    }
1641                }
1642            }
1643        }
1644
1645        Ok(result)
1646    }
1647
1648    /// Creates function expressions for projection and returns the expressions and new tags.
1649    ///
1650    /// # Side Effects
1651    ///
1652    /// This method will update [PromPlannerContext]'s fields and tags if needed.
1653    fn create_function_expr(
1654        &mut self,
1655        func: &Function,
1656        other_input_exprs: Vec<DfExpr>,
1657        query_engine_state: &QueryEngineState,
1658    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1659        // TODO(ruihang): check function args list
1660        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1661
1662        // TODO(ruihang): set this according to in-param list
1663        let field_column_pos = 0;
1664        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1665        // New labels after executing the function, e.g. `label_replace` etc.
1666        let mut new_tags = vec![];
1667        let scalar_func = match func.name {
1668            "increase" => ScalarFunc::ExtrapolateUdf(
1669                Arc::new(Increase::scalar_udf()),
1670                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1671            ),
1672            "rate" => ScalarFunc::ExtrapolateUdf(
1673                Arc::new(Rate::scalar_udf()),
1674                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1675            ),
1676            "delta" => ScalarFunc::ExtrapolateUdf(
1677                Arc::new(Delta::scalar_udf()),
1678                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1679            ),
1680            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1681            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1682            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1683            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1684            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1685            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1686            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1687            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1688            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1689            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1690            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1691            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1692            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1693            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1694            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1695            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1696            "predict_linear" => {
1697                other_input_exprs[0] = DfExpr::Cast(Cast {
1698                    expr: Box::new(other_input_exprs[0].clone()),
1699                    data_type: ArrowDataType::Int64,
1700                });
1701                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1702            }
1703            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1704            "time" => {
1705                exprs.push(build_special_time_expr(
1706                    self.ctx.time_index_column.as_ref().unwrap(),
1707                ));
1708                ScalarFunc::GeneratedExpr
1709            }
1710            "minute" => {
1711                // date_part('minute', time_index)
1712                let expr = self.date_part_on_time_index("minute")?;
1713                exprs.push(expr);
1714                ScalarFunc::GeneratedExpr
1715            }
1716            "hour" => {
1717                // date_part('hour', time_index)
1718                let expr = self.date_part_on_time_index("hour")?;
1719                exprs.push(expr);
1720                ScalarFunc::GeneratedExpr
1721            }
1722            "month" => {
1723                // date_part('month', time_index)
1724                let expr = self.date_part_on_time_index("month")?;
1725                exprs.push(expr);
1726                ScalarFunc::GeneratedExpr
1727            }
1728            "year" => {
1729                // date_part('year', time_index)
1730                let expr = self.date_part_on_time_index("year")?;
1731                exprs.push(expr);
1732                ScalarFunc::GeneratedExpr
1733            }
1734            "day_of_month" => {
1735                // date_part('day', time_index)
1736                let expr = self.date_part_on_time_index("day")?;
1737                exprs.push(expr);
1738                ScalarFunc::GeneratedExpr
1739            }
1740            "day_of_week" => {
1741                // date_part('dow', time_index)
1742                let expr = self.date_part_on_time_index("dow")?;
1743                exprs.push(expr);
1744                ScalarFunc::GeneratedExpr
1745            }
1746            "day_of_year" => {
1747                // date_part('doy', time_index)
1748                let expr = self.date_part_on_time_index("doy")?;
1749                exprs.push(expr);
1750                ScalarFunc::GeneratedExpr
1751            }
1752            "days_in_month" => {
1753                // date_part(
1754                //     'days',
1755                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1756                // );
1757                let day_lit_expr = "day".lit();
1758                let month_lit_expr = "month".lit();
1759                let interval_1month_lit_expr =
1760                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1761                let interval_1day_lit_expr = DfExpr::Literal(
1762                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1763                    None,
1764                );
1765                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1766                    left: Box::new(interval_1month_lit_expr),
1767                    op: Operator::Minus,
1768                    right: Box::new(interval_1day_lit_expr),
1769                });
1770                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1771                    func: datafusion_functions::datetime::date_trunc(),
1772                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1773                });
1774                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1775                    left: Box::new(date_trunc_expr),
1776                    op: Operator::Plus,
1777                    right: Box::new(the_1month_minus_1day_expr),
1778                });
1779                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1780                    func: datafusion_functions::datetime::date_part(),
1781                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1782                });
1783
1784                exprs.push(date_part_expr);
1785                ScalarFunc::GeneratedExpr
1786            }
1787
1788            "label_join" => {
1789                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1790                    &mut other_input_exprs,
1791                    &self.ctx,
1792                    query_engine_state,
1793                )?;
1794
1795                // Reserve the current field columns except the `dst_label`.
1796                for value in &self.ctx.field_columns {
1797                    if *value != dst_label {
1798                        let expr = DfExpr::Column(Column::from_name(value));
1799                        exprs.push(expr);
1800                    }
1801                }
1802
1803                // Remove it from tag columns if exists to avoid duplicated column names
1804                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1805                new_tags.push(dst_label);
1806                // Add the new label expr to evaluate
1807                exprs.push(concat_expr);
1808
1809                ScalarFunc::GeneratedExpr
1810            }
1811            "label_replace" => {
1812                if let Some((replace_expr, dst_label)) = self
1813                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1814                {
1815                    // Reserve the current field columns except the `dst_label`.
1816                    for value in &self.ctx.field_columns {
1817                        if *value != dst_label {
1818                            let expr = DfExpr::Column(Column::from_name(value));
1819                            exprs.push(expr);
1820                        }
1821                    }
1822
1823                    ensure!(
1824                        !self.ctx.tag_columns.contains(&dst_label),
1825                        SameLabelSetSnafu
1826                    );
1827                    new_tags.push(dst_label);
1828                    // Add the new label expr to evaluate
1829                    exprs.push(replace_expr);
1830                } else {
1831                    // Keep the current field columns
1832                    for value in &self.ctx.field_columns {
1833                        let expr = DfExpr::Column(Column::from_name(value));
1834                        exprs.push(expr);
1835                    }
1836                }
1837
1838                ScalarFunc::GeneratedExpr
1839            }
1840            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1841                // These functions are not expression but a part of plan,
1842                // they are processed by `prom_call_expr_to_plan`.
1843                for value in &self.ctx.field_columns {
1844                    let expr = DfExpr::Column(Column::from_name(value));
1845                    exprs.push(expr);
1846                }
1847
1848                ScalarFunc::GeneratedExpr
1849            }
1850            "round" => {
1851                if other_input_exprs.is_empty() {
1852                    other_input_exprs.push_front(0.0f64.lit());
1853                }
1854                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1855            }
1856            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1857            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1858            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1859            "pi" => {
1860                // pi functions doesn't accepts any arguments, needs special processing
1861                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1862                    func: datafusion::functions::math::pi(),
1863                    args: vec![],
1864                });
1865                exprs.push(fn_expr);
1866
1867                ScalarFunc::GeneratedExpr
1868            }
1869            _ => {
1870                if let Some(f) = query_engine_state
1871                    .session_state()
1872                    .scalar_functions()
1873                    .get(func.name)
1874                {
1875                    ScalarFunc::DataFusionBuiltin(f.clone())
1876                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1877                    let func_state = query_engine_state.function_state();
1878                    let query_ctx = self.table_provider.query_ctx();
1879
1880                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1881                        state: func_state,
1882                        query_ctx: query_ctx.clone(),
1883                    })))
1884                } else if let Some(f) = datafusion_functions::math::functions()
1885                    .iter()
1886                    .find(|f| f.name() == func.name)
1887                {
1888                    ScalarFunc::DataFusionUdf(f.clone())
1889                } else {
1890                    return UnsupportedExprSnafu {
1891                        name: func.name.to_string(),
1892                    }
1893                    .fail();
1894                }
1895            }
1896        };
1897
1898        for value in &self.ctx.field_columns {
1899            let col_expr = DfExpr::Column(Column::from_name(value));
1900
1901            match scalar_func.clone() {
1902                ScalarFunc::DataFusionBuiltin(func) => {
1903                    other_input_exprs.insert(field_column_pos, col_expr);
1904                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1905                        func,
1906                        args: other_input_exprs.clone().into(),
1907                    });
1908                    exprs.push(fn_expr);
1909                    let _ = other_input_exprs.remove(field_column_pos);
1910                }
1911                ScalarFunc::DataFusionUdf(func) => {
1912                    let args = itertools::chain!(
1913                        other_input_exprs.iter().take(field_column_pos).cloned(),
1914                        std::iter::once(col_expr),
1915                        other_input_exprs.iter().skip(field_column_pos).cloned()
1916                    )
1917                    .collect_vec();
1918                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1919                }
1920                ScalarFunc::Udf(func) => {
1921                    let ts_range_expr = DfExpr::Column(Column::from_name(
1922                        RangeManipulate::build_timestamp_range_name(
1923                            self.ctx.time_index_column.as_ref().unwrap(),
1924                        ),
1925                    ));
1926                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1927                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1928                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1929                        func,
1930                        args: other_input_exprs.clone().into(),
1931                    });
1932                    exprs.push(fn_expr);
1933                    let _ = other_input_exprs.remove(field_column_pos + 1);
1934                    let _ = other_input_exprs.remove(field_column_pos);
1935                }
1936                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1937                    let ts_range_expr = DfExpr::Column(Column::from_name(
1938                        RangeManipulate::build_timestamp_range_name(
1939                            self.ctx.time_index_column.as_ref().unwrap(),
1940                        ),
1941                    ));
1942                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1943                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1944                    other_input_exprs
1945                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1946                    other_input_exprs.push_back(lit(range_length));
1947                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1948                        func,
1949                        args: other_input_exprs.clone().into(),
1950                    });
1951                    exprs.push(fn_expr);
1952                    let _ = other_input_exprs.pop_back();
1953                    let _ = other_input_exprs.remove(field_column_pos + 2);
1954                    let _ = other_input_exprs.remove(field_column_pos + 1);
1955                    let _ = other_input_exprs.remove(field_column_pos);
1956                }
1957                ScalarFunc::GeneratedExpr => {}
1958            }
1959        }
1960
1961        // Update value columns' name, and alias them to remove qualifiers
1962        // For label functions such as `label_join`, `label_replace`, etc.,
1963        // we keep the fields unchanged.
1964        if !matches!(func.name, "label_join" | "label_replace") {
1965            let mut new_field_columns = Vec::with_capacity(exprs.len());
1966
1967            exprs = exprs
1968                .into_iter()
1969                .map(|expr| {
1970                    let display_name = expr.schema_name().to_string();
1971                    new_field_columns.push(display_name.clone());
1972                    Ok(expr.alias(display_name))
1973                })
1974                .collect::<std::result::Result<Vec<_>, _>>()
1975                .context(DataFusionPlanningSnafu)?;
1976
1977            self.ctx.field_columns = new_field_columns;
1978        }
1979
1980        Ok((exprs, new_tags))
1981    }
1982
1983    /// Validate label name according to Prometheus specification.
1984    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
1985    /// Additionally, label names starting with double underscores are reserved for internal use.
1986    fn validate_label_name(label_name: &str) -> Result<()> {
1987        // Check if label name starts with double underscores (reserved)
1988        if label_name.starts_with("__") {
1989            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1990        }
1991        // Check if label name matches the required pattern
1992        if !LABEL_NAME_REGEX.is_match(label_name) {
1993            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1994        }
1995
1996        Ok(())
1997    }
1998
1999    /// Build expr for `label_replace` function
2000    fn build_regexp_replace_label_expr(
2001        &self,
2002        other_input_exprs: &mut VecDeque<DfExpr>,
2003        query_engine_state: &QueryEngineState,
2004    ) -> Result<Option<(DfExpr, String)>> {
2005        // label_replace(vector, dst_label, replacement, src_label, regex)
2006        let dst_label = match other_input_exprs.pop_front() {
2007            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2008            other => UnexpectedPlanExprSnafu {
2009                desc: format!("expected dst_label string literal, but found {:?}", other),
2010            }
2011            .fail()?,
2012        };
2013
2014        // Validate the destination label name
2015        Self::validate_label_name(&dst_label)?;
2016        let replacement = match other_input_exprs.pop_front() {
2017            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2018            other => UnexpectedPlanExprSnafu {
2019                desc: format!("expected replacement string literal, but found {:?}", other),
2020            }
2021            .fail()?,
2022        };
2023        let src_label = match other_input_exprs.pop_front() {
2024            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2025            other => UnexpectedPlanExprSnafu {
2026                desc: format!("expected src_label string literal, but found {:?}", other),
2027            }
2028            .fail()?,
2029        };
2030
2031        let regex = match other_input_exprs.pop_front() {
2032            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2033            other => UnexpectedPlanExprSnafu {
2034                desc: format!("expected regex string literal, but found {:?}", other),
2035            }
2036            .fail()?,
2037        };
2038
2039        // Validate the regex before using it
2040        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2041        regex::Regex::new(&regex).map_err(|_| {
2042            InvalidRegularExpressionSnafu {
2043                regex: regex.clone(),
2044            }
2045            .build()
2046        })?;
2047
2048        // If the src_label exists and regex is empty, keep everything unchanged.
2049        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2050            return Ok(None);
2051        }
2052
2053        // If the src_label doesn't exists, and
2054        if !self.ctx.tag_columns.contains(&src_label) {
2055            if replacement.is_empty() {
2056                // the replacement is empty, keep everything unchanged.
2057                return Ok(None);
2058            } else {
2059                // the replacement is not empty, always adds dst_label with replacement value.
2060                return Ok(Some((
2061                    // alias literal `replacement` as dst_label
2062                    lit(replacement).alias(&dst_label),
2063                    dst_label,
2064                )));
2065            }
2066        }
2067
2068        // Preprocess the regex:
2069        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2070        let regex = format!("^(?s:{regex})$");
2071
2072        let session_state = query_engine_state.session_state();
2073        let func = session_state
2074            .scalar_functions()
2075            .get("regexp_replace")
2076            .context(UnsupportedExprSnafu {
2077                name: "regexp_replace",
2078            })?;
2079
2080        // regexp_replace(src_label, regex, replacement)
2081        let args = vec![
2082            if src_label.is_empty() {
2083                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2084            } else {
2085                DfExpr::Column(Column::from_name(src_label))
2086            },
2087            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2088            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2089        ];
2090
2091        Ok(Some((
2092            DfExpr::ScalarFunction(ScalarFunction {
2093                func: func.clone(),
2094                args,
2095            })
2096            .alias(&dst_label),
2097            dst_label,
2098        )))
2099    }
2100
2101    /// Build expr for `label_join` function
2102    fn build_concat_labels_expr(
2103        other_input_exprs: &mut VecDeque<DfExpr>,
2104        ctx: &PromPlannerContext,
2105        query_engine_state: &QueryEngineState,
2106    ) -> Result<(DfExpr, String)> {
2107        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2108
2109        let dst_label = match other_input_exprs.pop_front() {
2110            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2111            other => UnexpectedPlanExprSnafu {
2112                desc: format!("expected dst_label string literal, but found {:?}", other),
2113            }
2114            .fail()?,
2115        };
2116        let separator = match other_input_exprs.pop_front() {
2117            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2118            other => UnexpectedPlanExprSnafu {
2119                desc: format!("expected separator string literal, but found {:?}", other),
2120            }
2121            .fail()?,
2122        };
2123
2124        // Create a set of available columns (tag columns + field columns + time index column)
2125        let available_columns: HashSet<&str> = ctx
2126            .tag_columns
2127            .iter()
2128            .chain(ctx.field_columns.iter())
2129            .chain(ctx.time_index_column.as_ref())
2130            .map(|s| s.as_str())
2131            .collect();
2132
2133        let src_labels = other_input_exprs
2134            .iter()
2135            .map(|expr| {
2136                // Cast source label into column or null literal
2137                match expr {
2138                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2139                        if label.is_empty() {
2140                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2141                        } else if available_columns.contains(label.as_str()) {
2142                            // Label exists in the table schema
2143                            Ok(DfExpr::Column(Column::from_name(label)))
2144                        } else {
2145                            // Label doesn't exist, treat as empty string (null)
2146                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2147                        }
2148                    }
2149                    other => UnexpectedPlanExprSnafu {
2150                        desc: format!(
2151                            "expected source label string literal, but found {:?}",
2152                            other
2153                        ),
2154                    }
2155                    .fail(),
2156                }
2157            })
2158            .collect::<Result<Vec<_>>>()?;
2159        ensure!(
2160            !src_labels.is_empty(),
2161            FunctionInvalidArgumentSnafu {
2162                fn_name: "label_join"
2163            }
2164        );
2165
2166        let session_state = query_engine_state.session_state();
2167        let func = session_state
2168            .scalar_functions()
2169            .get("concat_ws")
2170            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2171
2172        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2173        let mut args = Vec::with_capacity(1 + src_labels.len());
2174        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2175        args.extend(src_labels);
2176
2177        Ok((
2178            DfExpr::ScalarFunction(ScalarFunction {
2179                func: func.clone(),
2180                args,
2181            })
2182            .alias(&dst_label),
2183            dst_label,
2184        ))
2185    }
2186
2187    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2188        Ok(DfExpr::Column(Column::from_name(
2189            self.ctx
2190                .time_index_column
2191                .clone()
2192                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2193        )))
2194    }
2195
2196    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2197        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2198        for tag in &self.ctx.tag_columns {
2199            let expr = DfExpr::Column(Column::from_name(tag));
2200            result.push(expr);
2201        }
2202        Ok(result)
2203    }
2204
2205    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2206        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2207        for field in &self.ctx.field_columns {
2208            let expr = DfExpr::Column(Column::from_name(field));
2209            result.push(expr);
2210        }
2211        Ok(result)
2212    }
2213
2214    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2215        let mut result = self
2216            .ctx
2217            .tag_columns
2218            .iter()
2219            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2220            .collect::<Vec<_>>();
2221        result.push(self.create_time_index_column_expr()?.sort(true, true));
2222        Ok(result)
2223    }
2224
2225    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2226        self.ctx
2227            .field_columns
2228            .iter()
2229            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2230            .collect::<Vec<_>>()
2231    }
2232
2233    fn create_sort_exprs_by_tags(
2234        func: &str,
2235        tags: Vec<DfExpr>,
2236        asc: bool,
2237    ) -> Result<Vec<SortExpr>> {
2238        ensure!(
2239            !tags.is_empty(),
2240            FunctionInvalidArgumentSnafu { fn_name: func }
2241        );
2242
2243        tags.iter()
2244            .map(|col| match col {
2245                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2246                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2247                }
2248                other => UnexpectedPlanExprSnafu {
2249                    desc: format!("expected label string literal, but found {:?}", other),
2250                }
2251                .fail(),
2252            })
2253            .collect::<Result<Vec<_>>>()
2254    }
2255
2256    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2257        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2258        for value in &self.ctx.field_columns {
2259            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2260            exprs.push(expr);
2261        }
2262
2263        conjunction(exprs).context(ValueNotFoundSnafu {
2264            table: self.table_ref()?.to_quoted_string(),
2265        })
2266    }
2267
2268    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2269    ///
2270    /// # Side Effects
2271    ///
2272    /// This method modifies the value columns in the context by replacing them with the new columns
2273    /// created by the aggregate function application.
2274    ///
2275    /// # Returns
2276    ///
2277    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2278    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2279    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2280    ///   only when the operation is `count_values`, as this operation requires preserving the original
2281    ///   values for grouping.
2282    ///
2283    fn create_aggregate_exprs(
2284        &mut self,
2285        op: TokenType,
2286        param: &Option<Box<PromExpr>>,
2287        input_plan: &LogicalPlan,
2288    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2289        let mut non_col_args = Vec::new();
2290        let aggr = match op.id() {
2291            token::T_SUM => sum_udaf(),
2292            token::T_QUANTILE => {
2293                let q =
2294                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2295                non_col_args.push(q);
2296                quantile_udaf()
2297            }
2298            token::T_AVG => avg_udaf(),
2299            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2300            token::T_MIN => min_udaf(),
2301            token::T_MAX => max_udaf(),
2302            token::T_GROUP => grouping_udaf(),
2303            token::T_STDDEV => stddev_pop_udaf(),
2304            token::T_STDVAR => var_pop_udaf(),
2305            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2306                name: format!("{op:?}"),
2307            }
2308            .fail()?,
2309            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2310        };
2311
2312        // perform aggregate operation to each value column
2313        let exprs: Vec<DfExpr> = self
2314            .ctx
2315            .field_columns
2316            .iter()
2317            .map(|col| {
2318                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2319                let expr = aggr.call(non_col_args.clone());
2320                non_col_args.pop();
2321                expr
2322            })
2323            .collect::<Vec<_>>();
2324
2325        // if the aggregator is `count_values`, it must be grouped by current fields.
2326        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2327            let prev_field_exprs: Vec<_> = self
2328                .ctx
2329                .field_columns
2330                .iter()
2331                .map(|col| DfExpr::Column(Column::from_name(col)))
2332                .collect();
2333
2334            ensure!(
2335                self.ctx.field_columns.len() == 1,
2336                UnsupportedExprSnafu {
2337                    name: "count_values on multi-value input"
2338                }
2339            );
2340
2341            prev_field_exprs
2342        } else {
2343            vec![]
2344        };
2345
2346        // update value column name according to the aggregators,
2347        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2348
2349        let normalized_exprs =
2350            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2351        for expr in normalized_exprs {
2352            new_field_columns.push(expr.schema_name().to_string());
2353        }
2354        self.ctx.field_columns = new_field_columns;
2355
2356        Ok((exprs, prev_field_exprs))
2357    }
2358
2359    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2360        let param = param
2361            .as_deref()
2362            .with_context(|| FunctionInvalidArgumentSnafu {
2363                fn_name: op.to_string(),
2364            })?;
2365        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2366            return FunctionInvalidArgumentSnafu {
2367                fn_name: op.to_string(),
2368            }
2369            .fail();
2370        };
2371
2372        Ok(val)
2373    }
2374
2375    fn get_param_as_literal_expr(
2376        param: &Option<Box<PromExpr>>,
2377        op: Option<TokenType>,
2378        expected_type: Option<ArrowDataType>,
2379    ) -> Result<DfExpr> {
2380        let prom_param = param.as_deref().with_context(|| {
2381            if let Some(op) = op {
2382                FunctionInvalidArgumentSnafu {
2383                    fn_name: op.to_string(),
2384                }
2385            } else {
2386                FunctionInvalidArgumentSnafu {
2387                    fn_name: "unknown".to_string(),
2388                }
2389            }
2390        })?;
2391
2392        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2393            if let Some(op) = op {
2394                FunctionInvalidArgumentSnafu {
2395                    fn_name: op.to_string(),
2396                }
2397            } else {
2398                FunctionInvalidArgumentSnafu {
2399                    fn_name: "unknown".to_string(),
2400                }
2401            }
2402        })?;
2403
2404        // check if the type is expected
2405        if let Some(expected_type) = expected_type {
2406            // literal should not have reference to column
2407            let expr_type = expr
2408                .get_type(&DFSchema::empty())
2409                .context(DataFusionPlanningSnafu)?;
2410            if expected_type != expr_type {
2411                return FunctionInvalidArgumentSnafu {
2412                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2413                }
2414                .fail();
2415            }
2416        }
2417
2418        Ok(expr)
2419    }
2420
2421    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2422    ///
2423    fn create_window_exprs(
2424        &mut self,
2425        op: TokenType,
2426        group_exprs: Vec<DfExpr>,
2427        input_plan: &LogicalPlan,
2428    ) -> Result<Vec<DfExpr>> {
2429        ensure!(
2430            self.ctx.field_columns.len() == 1,
2431            UnsupportedExprSnafu {
2432                name: "topk or bottomk on multi-value input"
2433            }
2434        );
2435
2436        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2437
2438        let asc = matches!(op.id(), token::T_BOTTOMK);
2439
2440        let tag_sort_exprs = self
2441            .create_tag_column_exprs()?
2442            .into_iter()
2443            .map(|expr| expr.sort(asc, true));
2444
2445        // perform window operation to each value column
2446        let exprs: Vec<DfExpr> = self
2447            .ctx
2448            .field_columns
2449            .iter()
2450            .map(|col| {
2451                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2452                // Order by value in the specific order
2453                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2454                // Then tags if the values are equal,
2455                // Try to ensure the relative stability of the output results.
2456                sort_exprs.extend(tag_sort_exprs.clone());
2457
2458                DfExpr::WindowFunction(Box::new(WindowFunction {
2459                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2460                    params: WindowFunctionParams {
2461                        args: vec![],
2462                        partition_by: group_exprs.clone(),
2463                        order_by: sort_exprs,
2464                        window_frame: WindowFrame::new(Some(true)),
2465                        null_treatment: None,
2466                        distinct: false,
2467                        filter: None,
2468                    },
2469                }))
2470            })
2471            .collect();
2472
2473        let normalized_exprs =
2474            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2475        Ok(normalized_exprs)
2476    }
2477
2478    /// Try to build a [f64] from [PromExpr].
2479    #[deprecated(
2480        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2481    )]
2482    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2483        match expr {
2484            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2485            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2486            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2487                Self::try_build_float_literal(expr).map(|f| -f)
2488            }
2489            PromExpr::StringLiteral(_)
2490            | PromExpr::Binary(_)
2491            | PromExpr::VectorSelector(_)
2492            | PromExpr::MatrixSelector(_)
2493            | PromExpr::Call(_)
2494            | PromExpr::Extension(_)
2495            | PromExpr::Aggregate(_)
2496            | PromExpr::Subquery(_) => None,
2497        }
2498    }
2499
2500    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2501    async fn create_histogram_plan(
2502        &mut self,
2503        args: &PromFunctionArgs,
2504        query_engine_state: &QueryEngineState,
2505    ) -> Result<LogicalPlan> {
2506        if args.args.len() != 2 {
2507            return FunctionInvalidArgumentSnafu {
2508                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2509            }
2510            .fail();
2511        }
2512        #[allow(deprecated)]
2513        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2514            FunctionInvalidArgumentSnafu {
2515                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2516            }
2517        })?;
2518
2519        let input = args.args[1].as_ref().clone();
2520        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2521
2522        if !self.ctx.has_le_tag() {
2523            // Return empty result instead of error when 'le' column is not found
2524            // This handles the case when histogram metrics don't exist
2525            return Ok(LogicalPlan::EmptyRelation(
2526                datafusion::logical_expr::EmptyRelation {
2527                    produce_one_row: false,
2528                    schema: Arc::new(DFSchema::empty()),
2529                },
2530            ));
2531        }
2532        let time_index_column =
2533            self.ctx
2534                .time_index_column
2535                .clone()
2536                .with_context(|| TimeIndexNotFoundSnafu {
2537                    table: self.ctx.table_name.clone().unwrap_or_default(),
2538                })?;
2539        // FIXME(ruihang): support multi fields
2540        let field_column = self
2541            .ctx
2542            .field_columns
2543            .first()
2544            .with_context(|| FunctionInvalidArgumentSnafu {
2545                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2546            })?
2547            .clone();
2548        // remove le column from tag columns
2549        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2550
2551        Ok(LogicalPlan::Extension(Extension {
2552            node: Arc::new(
2553                HistogramFold::new(
2554                    LE_COLUMN_NAME.to_string(),
2555                    field_column,
2556                    time_index_column,
2557                    phi,
2558                    input_plan,
2559                )
2560                .context(DataFusionPlanningSnafu)?,
2561            ),
2562        }))
2563    }
2564
2565    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2566    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2567        if args.args.len() != 1 {
2568            return FunctionInvalidArgumentSnafu {
2569                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2570            }
2571            .fail();
2572        }
2573        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2574
2575        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2576        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2577        self.ctx.reset_table_name_and_schema();
2578        self.ctx.tag_columns = vec![];
2579        self.ctx.field_columns = vec![greptime_value().to_string()];
2580        Ok(LogicalPlan::Extension(Extension {
2581            node: Arc::new(
2582                EmptyMetric::new(
2583                    self.ctx.start,
2584                    self.ctx.end,
2585                    self.ctx.interval,
2586                    SPECIAL_TIME_FUNCTION.to_string(),
2587                    greptime_value().to_string(),
2588                    Some(lit),
2589                )
2590                .context(DataFusionPlanningSnafu)?,
2591            ),
2592        }))
2593    }
2594
2595    /// Create a [SCALAR_FUNCTION] plan
2596    async fn create_scalar_plan(
2597        &mut self,
2598        args: &PromFunctionArgs,
2599        query_engine_state: &QueryEngineState,
2600    ) -> Result<LogicalPlan> {
2601        ensure!(
2602            args.len() == 1,
2603            FunctionInvalidArgumentSnafu {
2604                fn_name: SCALAR_FUNCTION
2605            }
2606        );
2607        let input = self
2608            .prom_expr_to_plan(&args.args[0], query_engine_state)
2609            .await?;
2610        ensure!(
2611            self.ctx.field_columns.len() == 1,
2612            MultiFieldsNotSupportedSnafu {
2613                operator: SCALAR_FUNCTION
2614            },
2615        );
2616        let scalar_plan = LogicalPlan::Extension(Extension {
2617            node: Arc::new(
2618                ScalarCalculate::new(
2619                    self.ctx.start,
2620                    self.ctx.end,
2621                    self.ctx.interval,
2622                    input,
2623                    self.ctx.time_index_column.as_ref().unwrap(),
2624                    &self.ctx.tag_columns,
2625                    &self.ctx.field_columns[0],
2626                    self.ctx.table_name.as_deref(),
2627                )
2628                .context(PromqlPlanNodeSnafu)?,
2629            ),
2630        });
2631        // scalar plan have no tag columns
2632        self.ctx.tag_columns.clear();
2633        self.ctx.field_columns.clear();
2634        self.ctx
2635            .field_columns
2636            .push(scalar_plan.schema().field(1).name().clone());
2637        Ok(scalar_plan)
2638    }
2639
2640    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
2641    async fn create_absent_plan(
2642        &mut self,
2643        args: &PromFunctionArgs,
2644        query_engine_state: &QueryEngineState,
2645    ) -> Result<LogicalPlan> {
2646        if args.args.len() != 1 {
2647            return FunctionInvalidArgumentSnafu {
2648                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2649            }
2650            .fail();
2651        }
2652        let input = self
2653            .prom_expr_to_plan(&args.args[0], query_engine_state)
2654            .await?;
2655
2656        let time_index_expr = self.create_time_index_column_expr()?;
2657        let first_field_expr =
2658            self.create_field_column_exprs()?
2659                .pop()
2660                .with_context(|| ValueNotFoundSnafu {
2661                    table: self.ctx.table_name.clone().unwrap_or_default(),
2662                })?;
2663        let first_value_expr = first_value(first_field_expr, vec![]);
2664
2665        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2666            .aggregate(
2667                vec![time_index_expr.clone()],
2668                vec![first_value_expr.clone()],
2669            )
2670            .context(DataFusionPlanningSnafu)?
2671            .sort(vec![time_index_expr.sort(true, false)])
2672            .context(DataFusionPlanningSnafu)?
2673            .build()
2674            .context(DataFusionPlanningSnafu)?;
2675
2676        let fake_labels = self
2677            .ctx
2678            .selector_matcher
2679            .iter()
2680            .filter_map(|matcher| match matcher.op {
2681                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2682                _ => None,
2683            })
2684            .collect::<Vec<_>>();
2685
2686        // Create the absent plan
2687        let absent_plan = LogicalPlan::Extension(Extension {
2688            node: Arc::new(
2689                Absent::try_new(
2690                    self.ctx.start,
2691                    self.ctx.end,
2692                    self.ctx.interval,
2693                    self.ctx.time_index_column.as_ref().unwrap().clone(),
2694                    self.ctx.field_columns[0].clone(),
2695                    fake_labels,
2696                    ordered_aggregated_input,
2697                )
2698                .context(DataFusionPlanningSnafu)?,
2699            ),
2700        });
2701
2702        Ok(absent_plan)
2703    }
2704
2705    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2706    /// `None` if the input is not a literal expression.
2707    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2708        match expr {
2709            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2710            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2711            PromExpr::VectorSelector(_)
2712            | PromExpr::MatrixSelector(_)
2713            | PromExpr::Extension(_)
2714            | PromExpr::Aggregate(_)
2715            | PromExpr::Subquery(_) => None,
2716            PromExpr::Call(Call { func, .. }) => {
2717                if func.name == SPECIAL_TIME_FUNCTION {
2718                    // For time() function, don't treat it as a literal
2719                    // Let it be handled as a regular function call
2720                    None
2721                } else {
2722                    None
2723                }
2724            }
2725            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2726            // TODO(ruihang): support Unary operator
2727            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2728            PromExpr::Binary(PromBinaryExpr {
2729                lhs,
2730                rhs,
2731                op,
2732                modifier,
2733            }) => {
2734                let lhs = Self::try_build_literal_expr(lhs)?;
2735                let rhs = Self::try_build_literal_expr(rhs)?;
2736                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2737                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2738                let expr = expr_builder(lhs, rhs).ok()?;
2739
2740                let should_return_bool = if let Some(m) = modifier {
2741                    m.return_bool
2742                } else {
2743                    false
2744                };
2745                if is_comparison_op && should_return_bool {
2746                    Some(DfExpr::Cast(Cast {
2747                        expr: Box::new(expr),
2748                        data_type: ArrowDataType::Float64,
2749                    }))
2750                } else {
2751                    Some(expr)
2752                }
2753            }
2754        }
2755    }
2756
2757    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2758        match expr {
2759            PromExpr::Call(Call { func, .. }) => {
2760                if func.name == SPECIAL_TIME_FUNCTION
2761                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2762                {
2763                    Some(build_special_time_expr(time_index_col))
2764                } else {
2765                    None
2766                }
2767            }
2768            _ => None,
2769        }
2770    }
2771
2772    /// Return a lambda to build binary expression from token.
2773    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2774    #[allow(clippy::type_complexity)]
2775    fn prom_token_to_binary_expr_builder(
2776        token: TokenType,
2777    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2778        match token.id() {
2779            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2780            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2781            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2782            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2783            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2784            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2785            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2786            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2787            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2788            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2789            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2790            token::T_POW => Ok(Box::new(|lhs, rhs| {
2791                Ok(DfExpr::ScalarFunction(ScalarFunction {
2792                    func: datafusion_functions::math::power(),
2793                    args: vec![lhs, rhs],
2794                }))
2795            })),
2796            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2797                Ok(DfExpr::ScalarFunction(ScalarFunction {
2798                    func: datafusion_functions::math::atan2(),
2799                    args: vec![lhs, rhs],
2800                }))
2801            })),
2802            _ => UnexpectedTokenSnafu { token }.fail(),
2803        }
2804    }
2805
2806    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2807    fn is_token_a_comparison_op(token: TokenType) -> bool {
2808        matches!(
2809            token.id(),
2810            token::T_EQLC
2811                | token::T_NEQ
2812                | token::T_GTR
2813                | token::T_LSS
2814                | token::T_GTE
2815                | token::T_LTE
2816        )
2817    }
2818
2819    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2820    fn is_token_a_set_op(token: TokenType) -> bool {
2821        matches!(
2822            token.id(),
2823            token::T_LAND // INTERSECT
2824                | token::T_LOR // UNION
2825                | token::T_LUNLESS // EXCEPT
2826        )
2827    }
2828
2829    /// Build a inner join on time index column and tag columns to concat two logical plans.
2830    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2831    #[allow(clippy::too_many_arguments)]
2832    fn join_on_non_field_columns(
2833        &self,
2834        left: LogicalPlan,
2835        right: LogicalPlan,
2836        left_table_ref: TableReference,
2837        right_table_ref: TableReference,
2838        left_time_index_column: Option<String>,
2839        right_time_index_column: Option<String>,
2840        only_join_time_index: bool,
2841        modifier: &Option<BinModifier>,
2842    ) -> Result<LogicalPlan> {
2843        let mut left_tag_columns = if only_join_time_index {
2844            BTreeSet::new()
2845        } else {
2846            self.ctx
2847                .tag_columns
2848                .iter()
2849                .cloned()
2850                .collect::<BTreeSet<_>>()
2851        };
2852        let mut right_tag_columns = left_tag_columns.clone();
2853
2854        // apply modifier
2855        if let Some(modifier) = modifier {
2856            // apply label modifier
2857            if let Some(matching) = &modifier.matching {
2858                match matching {
2859                    // keeps columns mentioned in `on`
2860                    LabelModifier::Include(on) => {
2861                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2862                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2863                        right_tag_columns =
2864                            right_tag_columns.intersection(&mask).cloned().collect();
2865                    }
2866                    // removes columns memtioned in `ignoring`
2867                    LabelModifier::Exclude(ignoring) => {
2868                        // doesn't check existence of label
2869                        for label in &ignoring.labels {
2870                            let _ = left_tag_columns.remove(label);
2871                            let _ = right_tag_columns.remove(label);
2872                        }
2873                    }
2874                }
2875            }
2876        }
2877
2878        // push time index column if it exists
2879        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2880            (left_time_index_column, right_time_index_column)
2881        {
2882            left_tag_columns.insert(left_time_index_column);
2883            right_tag_columns.insert(right_time_index_column);
2884        }
2885
2886        let right = LogicalPlanBuilder::from(right)
2887            .alias(right_table_ref)
2888            .context(DataFusionPlanningSnafu)?
2889            .build()
2890            .context(DataFusionPlanningSnafu)?;
2891
2892        // Inner Join on time index column to concat two operator
2893        LogicalPlanBuilder::from(left)
2894            .alias(left_table_ref)
2895            .context(DataFusionPlanningSnafu)?
2896            .join_detailed(
2897                right,
2898                JoinType::Inner,
2899                (
2900                    left_tag_columns
2901                        .into_iter()
2902                        .map(Column::from_name)
2903                        .collect::<Vec<_>>(),
2904                    right_tag_columns
2905                        .into_iter()
2906                        .map(Column::from_name)
2907                        .collect::<Vec<_>>(),
2908                ),
2909                None,
2910                NullEquality::NullEqualsNull,
2911            )
2912            .context(DataFusionPlanningSnafu)?
2913            .build()
2914            .context(DataFusionPlanningSnafu)
2915    }
2916
2917    /// Build a set operator (AND/OR/UNLESS)
2918    fn set_op_on_non_field_columns(
2919        &mut self,
2920        left: LogicalPlan,
2921        mut right: LogicalPlan,
2922        left_context: PromPlannerContext,
2923        right_context: PromPlannerContext,
2924        op: TokenType,
2925        modifier: &Option<BinModifier>,
2926    ) -> Result<LogicalPlan> {
2927        let mut left_tag_col_set = left_context
2928            .tag_columns
2929            .iter()
2930            .cloned()
2931            .collect::<HashSet<_>>();
2932        let mut right_tag_col_set = right_context
2933            .tag_columns
2934            .iter()
2935            .cloned()
2936            .collect::<HashSet<_>>();
2937
2938        if matches!(op.id(), token::T_LOR) {
2939            return self.or_operator(
2940                left,
2941                right,
2942                left_tag_col_set,
2943                right_tag_col_set,
2944                left_context,
2945                right_context,
2946                modifier,
2947            );
2948        }
2949
2950        // apply modifier
2951        if let Some(modifier) = modifier {
2952            // one-to-many and many-to-one are not supported
2953            ensure!(
2954                matches!(
2955                    modifier.card,
2956                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2957                ),
2958                UnsupportedVectorMatchSnafu {
2959                    name: modifier.card.clone(),
2960                },
2961            );
2962            // apply label modifier
2963            if let Some(matching) = &modifier.matching {
2964                match matching {
2965                    // keeps columns mentioned in `on`
2966                    LabelModifier::Include(on) => {
2967                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2968                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2969                        right_tag_col_set =
2970                            right_tag_col_set.intersection(&mask).cloned().collect();
2971                    }
2972                    // removes columns memtioned in `ignoring`
2973                    LabelModifier::Exclude(ignoring) => {
2974                        // doesn't check existence of label
2975                        for label in &ignoring.labels {
2976                            let _ = left_tag_col_set.remove(label);
2977                            let _ = right_tag_col_set.remove(label);
2978                        }
2979                    }
2980                }
2981            }
2982        }
2983        // ensure two sides have the same tag columns
2984        if !matches!(op.id(), token::T_LOR) {
2985            ensure!(
2986                left_tag_col_set == right_tag_col_set,
2987                CombineTableColumnMismatchSnafu {
2988                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2989                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2990                }
2991            )
2992        };
2993        let left_time_index = left_context.time_index_column.clone().unwrap();
2994        let right_time_index = right_context.time_index_column.clone().unwrap();
2995        let join_keys = left_tag_col_set
2996            .iter()
2997            .cloned()
2998            .chain([left_time_index.clone()])
2999            .collect::<Vec<_>>();
3000        self.ctx.time_index_column = Some(left_time_index.clone());
3001
3002        // alias right time index column if necessary
3003        if left_context.time_index_column != right_context.time_index_column {
3004            let right_project_exprs = right
3005                .schema()
3006                .fields()
3007                .iter()
3008                .map(|field| {
3009                    if field.name() == &right_time_index {
3010                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3011                    } else {
3012                        DfExpr::Column(Column::from_name(field.name()))
3013                    }
3014                })
3015                .collect::<Vec<_>>();
3016
3017            right = LogicalPlanBuilder::from(right)
3018                .project(right_project_exprs)
3019                .context(DataFusionPlanningSnafu)?
3020                .build()
3021                .context(DataFusionPlanningSnafu)?;
3022        }
3023
3024        ensure!(
3025            left_context.field_columns.len() == 1,
3026            MultiFieldsNotSupportedSnafu {
3027                operator: "AND operator"
3028            }
3029        );
3030        // Update the field column in context.
3031        // The AND/UNLESS operator only keep the field column in left input.
3032        let left_field_col = left_context.field_columns.first().unwrap();
3033        self.ctx.field_columns = vec![left_field_col.clone()];
3034
3035        // Generate join plan.
3036        // All set operations in PromQL are "distinct"
3037        match op.id() {
3038            token::T_LAND => LogicalPlanBuilder::from(left)
3039                .distinct()
3040                .context(DataFusionPlanningSnafu)?
3041                .join_detailed(
3042                    right,
3043                    JoinType::LeftSemi,
3044                    (join_keys.clone(), join_keys),
3045                    None,
3046                    NullEquality::NullEqualsNull,
3047                )
3048                .context(DataFusionPlanningSnafu)?
3049                .build()
3050                .context(DataFusionPlanningSnafu),
3051            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3052                .distinct()
3053                .context(DataFusionPlanningSnafu)?
3054                .join_detailed(
3055                    right,
3056                    JoinType::LeftAnti,
3057                    (join_keys.clone(), join_keys),
3058                    None,
3059                    NullEquality::NullEqualsNull,
3060                )
3061                .context(DataFusionPlanningSnafu)?
3062                .build()
3063                .context(DataFusionPlanningSnafu),
3064            token::T_LOR => {
3065                // OR is handled at the beginning of this function, as it cannot
3066                // be expressed using JOIN like AND and UNLESS.
3067                unreachable!()
3068            }
3069            _ => UnexpectedTokenSnafu { token: op }.fail(),
3070        }
3071    }
3072
3073    // TODO(ruihang): change function name
3074    #[allow(clippy::too_many_arguments)]
3075    fn or_operator(
3076        &mut self,
3077        left: LogicalPlan,
3078        right: LogicalPlan,
3079        left_tag_cols_set: HashSet<String>,
3080        right_tag_cols_set: HashSet<String>,
3081        left_context: PromPlannerContext,
3082        right_context: PromPlannerContext,
3083        modifier: &Option<BinModifier>,
3084    ) -> Result<LogicalPlan> {
3085        // checks
3086        ensure!(
3087            left_context.field_columns.len() == right_context.field_columns.len(),
3088            CombineTableColumnMismatchSnafu {
3089                left: left_context.field_columns.clone(),
3090                right: right_context.field_columns.clone()
3091            }
3092        );
3093        ensure!(
3094            left_context.field_columns.len() == 1,
3095            MultiFieldsNotSupportedSnafu {
3096                operator: "OR operator"
3097            }
3098        );
3099
3100        // prepare hash sets
3101        let all_tags = left_tag_cols_set
3102            .union(&right_tag_cols_set)
3103            .cloned()
3104            .collect::<HashSet<_>>();
3105        let tags_not_in_left = all_tags
3106            .difference(&left_tag_cols_set)
3107            .cloned()
3108            .collect::<Vec<_>>();
3109        let tags_not_in_right = all_tags
3110            .difference(&right_tag_cols_set)
3111            .cloned()
3112            .collect::<Vec<_>>();
3113        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3114        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3115        let left_qualifier_string = left_qualifier
3116            .as_ref()
3117            .map(|l| l.to_string())
3118            .unwrap_or_default();
3119        let right_qualifier_string = right_qualifier
3120            .as_ref()
3121            .map(|r| r.to_string())
3122            .unwrap_or_default();
3123        let left_time_index_column =
3124            left_context
3125                .time_index_column
3126                .clone()
3127                .with_context(|| TimeIndexNotFoundSnafu {
3128                    table: left_qualifier_string.clone(),
3129                })?;
3130        let right_time_index_column =
3131            right_context
3132                .time_index_column
3133                .clone()
3134                .with_context(|| TimeIndexNotFoundSnafu {
3135                    table: right_qualifier_string.clone(),
3136                })?;
3137        // Take the name of first field column. The length is checked above.
3138        let left_field_col = left_context.field_columns.first().unwrap();
3139        let right_field_col = right_context.field_columns.first().unwrap();
3140
3141        // step 0: fill all columns in output schema
3142        let mut all_columns_set = left
3143            .schema()
3144            .fields()
3145            .iter()
3146            .chain(right.schema().fields().iter())
3147            .map(|field| field.name().clone())
3148            .collect::<HashSet<_>>();
3149        // remove time index column
3150        all_columns_set.remove(&left_time_index_column);
3151        all_columns_set.remove(&right_time_index_column);
3152        // remove field column in the right
3153        if left_field_col != right_field_col {
3154            all_columns_set.remove(right_field_col);
3155        }
3156        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3157        // sort to ensure the generated schema is not volatile
3158        all_columns.sort_unstable();
3159        // use left time index column name as the result time index column name
3160        all_columns.insert(0, left_time_index_column.clone());
3161
3162        // step 1: align schema using project, fill non-exist columns with null
3163        let left_proj_exprs = all_columns.iter().map(|col| {
3164            if tags_not_in_left.contains(col) {
3165                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3166            } else {
3167                DfExpr::Column(Column::new(None::<String>, col))
3168            }
3169        });
3170        let right_time_index_expr = DfExpr::Column(Column::new(
3171            right_qualifier.clone(),
3172            right_time_index_column,
3173        ))
3174        .alias(left_time_index_column.clone());
3175        // The field column in right side may not have qualifier (it may be removed by join operation),
3176        // so we need to find it from the schema.
3177        let right_qualifier_for_field = right
3178            .schema()
3179            .iter()
3180            .find(|(_, f)| f.name() == right_field_col)
3181            .map(|(q, _)| q)
3182            .with_context(|| ColumnNotFoundSnafu {
3183                col: right_field_col.clone(),
3184            })?
3185            .cloned();
3186
3187        // `skip(1)` to skip the time index column
3188        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3189            // expr
3190            if col == left_field_col && left_field_col != right_field_col {
3191                // qualify field in right side if necessary to handle different field name
3192                DfExpr::Column(Column::new(
3193                    right_qualifier_for_field.clone(),
3194                    right_field_col,
3195                ))
3196            } else if tags_not_in_right.contains(col) {
3197                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3198            } else {
3199                DfExpr::Column(Column::new(None::<String>, col))
3200            }
3201        });
3202        let right_proj_exprs = [right_time_index_expr]
3203            .into_iter()
3204            .chain(right_proj_exprs_without_time_index);
3205
3206        let left_projected = LogicalPlanBuilder::from(left)
3207            .project(left_proj_exprs)
3208            .context(DataFusionPlanningSnafu)?
3209            .alias(left_qualifier_string.clone())
3210            .context(DataFusionPlanningSnafu)?
3211            .build()
3212            .context(DataFusionPlanningSnafu)?;
3213        let right_projected = LogicalPlanBuilder::from(right)
3214            .project(right_proj_exprs)
3215            .context(DataFusionPlanningSnafu)?
3216            .alias(right_qualifier_string.clone())
3217            .context(DataFusionPlanningSnafu)?
3218            .build()
3219            .context(DataFusionPlanningSnafu)?;
3220
3221        // step 2: compute match columns
3222        let mut match_columns = if let Some(modifier) = modifier
3223            && let Some(matching) = &modifier.matching
3224        {
3225            match matching {
3226                // keeps columns mentioned in `on`
3227                LabelModifier::Include(on) => on.labels.clone(),
3228                // removes columns memtioned in `ignoring`
3229                LabelModifier::Exclude(ignoring) => {
3230                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3231                    all_tags.difference(&ignoring).cloned().collect()
3232                }
3233            }
3234        } else {
3235            all_tags.iter().cloned().collect()
3236        };
3237        // sort to ensure the generated plan is not volatile
3238        match_columns.sort_unstable();
3239        // step 3: build `UnionDistinctOn` plan
3240        let schema = left_projected.schema().clone();
3241        let union_distinct_on = UnionDistinctOn::new(
3242            left_projected,
3243            right_projected,
3244            match_columns,
3245            left_time_index_column.clone(),
3246            schema,
3247        );
3248        let result = LogicalPlan::Extension(Extension {
3249            node: Arc::new(union_distinct_on),
3250        });
3251
3252        // step 4: update context
3253        self.ctx.time_index_column = Some(left_time_index_column);
3254        self.ctx.tag_columns = all_tags.into_iter().collect();
3255        self.ctx.field_columns = vec![left_field_col.clone()];
3256
3257        Ok(result)
3258    }
3259
3260    /// Build a projection that project and perform operation expr for every value columns.
3261    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3262    ///
3263    /// # Side effect
3264    ///
3265    /// This function will update the value columns in the context. Those new column names
3266    /// don't contains qualifier.
3267    fn projection_for_each_field_column<F>(
3268        &mut self,
3269        input: LogicalPlan,
3270        name_to_expr: F,
3271    ) -> Result<LogicalPlan>
3272    where
3273        F: FnMut(&String) -> Result<DfExpr>,
3274    {
3275        let non_field_columns_iter = self
3276            .ctx
3277            .tag_columns
3278            .iter()
3279            .chain(self.ctx.time_index_column.iter())
3280            .map(|col| {
3281                Ok(DfExpr::Column(Column::new(
3282                    self.ctx.table_name.clone().map(TableReference::bare),
3283                    col,
3284                )))
3285            });
3286
3287        // build computation exprs
3288        let result_field_columns = self
3289            .ctx
3290            .field_columns
3291            .iter()
3292            .map(name_to_expr)
3293            .collect::<Result<Vec<_>>>()?;
3294
3295        // alias the computation exprs to remove qualifier
3296        self.ctx.field_columns = result_field_columns
3297            .iter()
3298            .map(|expr| expr.schema_name().to_string())
3299            .collect();
3300        let field_columns_iter = result_field_columns
3301            .into_iter()
3302            .zip(self.ctx.field_columns.iter())
3303            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3304
3305        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3306        let project_fields = non_field_columns_iter
3307            .chain(field_columns_iter)
3308            .collect::<Result<Vec<_>>>()?;
3309
3310        LogicalPlanBuilder::from(input)
3311            .project(project_fields)
3312            .context(DataFusionPlanningSnafu)?
3313            .build()
3314            .context(DataFusionPlanningSnafu)
3315    }
3316
3317    /// Build a filter plan that filter on value column. Notice that only one value column
3318    /// is expected.
3319    fn filter_on_field_column<F>(
3320        &self,
3321        input: LogicalPlan,
3322        mut name_to_expr: F,
3323    ) -> Result<LogicalPlan>
3324    where
3325        F: FnMut(&String) -> Result<DfExpr>,
3326    {
3327        ensure!(
3328            self.ctx.field_columns.len() == 1,
3329            UnsupportedExprSnafu {
3330                name: "filter on multi-value input"
3331            }
3332        );
3333
3334        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3335
3336        LogicalPlanBuilder::from(input)
3337            .filter(field_column_filter)
3338            .context(DataFusionPlanningSnafu)?
3339            .build()
3340            .context(DataFusionPlanningSnafu)
3341    }
3342
3343    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3344    /// time index column in context is set
3345    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3346        let input_expr = datafusion::logical_expr::col(
3347            self.ctx
3348                .time_index_column
3349                .as_ref()
3350                // table name doesn't matters here
3351                .with_context(|| TimeIndexNotFoundSnafu {
3352                    table: "<doesn't matter>",
3353                })?,
3354        );
3355        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3356            func: datafusion_functions::datetime::date_part(),
3357            args: vec![date_part.lit(), input_expr],
3358        });
3359        Ok(fn_expr)
3360    }
3361
3362    /// Apply an alias to the query result by adding a projection with the alias name
3363    fn apply_alias_projection(
3364        &mut self,
3365        plan: LogicalPlan,
3366        alias_name: String,
3367    ) -> Result<LogicalPlan> {
3368        let fields_expr = self.create_field_column_exprs()?;
3369
3370        // TODO(dennis): how to support multi-value aliasing?
3371        ensure!(
3372            fields_expr.len() == 1,
3373            UnsupportedExprSnafu {
3374                name: "alias on multi-value result"
3375            }
3376        );
3377
3378        let project_fields = fields_expr
3379            .into_iter()
3380            .map(|expr| expr.alias(&alias_name))
3381            .chain(self.create_tag_column_exprs()?)
3382            .chain(Some(self.create_time_index_column_expr()?));
3383
3384        LogicalPlanBuilder::from(plan)
3385            .project(project_fields)
3386            .context(DataFusionPlanningSnafu)?
3387            .build()
3388            .context(DataFusionPlanningSnafu)
3389    }
3390}
3391
3392#[derive(Default, Debug)]
3393struct FunctionArgs {
3394    input: Option<PromExpr>,
3395    literals: Vec<DfExpr>,
3396}
3397
3398/// Represents different types of scalar functions supported in PromQL expressions.
3399/// Each variant defines how the function should be processed and what arguments it expects.
3400#[derive(Debug, Clone)]
3401enum ScalarFunc {
3402    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3403    /// These are passed through directly to DataFusion's execution engine.
3404    /// Processing: Simple argument insertion at the specified position.
3405    DataFusionBuiltin(Arc<ScalarUdfDef>),
3406    /// User-defined functions registered in DataFusion's function registry.
3407    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3408    /// Processing: Direct pass-through with argument positioning.
3409    DataFusionUdf(Arc<ScalarUdfDef>),
3410    /// PromQL-specific functions that operate on time series data with temporal context.
3411    /// These functions require both timestamp ranges and values to perform calculations.
3412    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3413    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3414    Udf(Arc<ScalarUdfDef>),
3415    /// PromQL functions requiring extrapolation calculations with explicit range information.
3416    /// These functions need to know the time range length to perform rate calculations.
3417    /// The second field contains the range length in milliseconds.
3418    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
3419    /// Examples: increase, rate, delta
3420    // TODO(ruihang): maybe merge with Udf later
3421    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3422    /// Functions that generate expressions directly without external UDF calls.
3423    /// The expression is constructed during function matching and requires no additional processing.
3424    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
3425    GeneratedExpr,
3426}
3427
3428#[cfg(test)]
3429mod test {
3430    use std::time::{Duration, UNIX_EPOCH};
3431
3432    use catalog::RegisterTableRequest;
3433    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3434    use common_base::Plugins;
3435    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3436    use common_query::prelude::greptime_timestamp;
3437    use common_query::test_util::DummyDecoder;
3438    use datatypes::prelude::ConcreteDataType;
3439    use datatypes::schema::{ColumnSchema, Schema};
3440    use promql_parser::label::Labels;
3441    use promql_parser::parser;
3442    use session::context::QueryContext;
3443    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3444    use table::test_util::EmptyTable;
3445
3446    use super::*;
3447    use crate::options::QueryOptions;
3448
3449    fn build_query_engine_state() -> QueryEngineState {
3450        QueryEngineState::new(
3451            new_memory_catalog_manager().unwrap(),
3452            None,
3453            None,
3454            None,
3455            None,
3456            None,
3457            false,
3458            Plugins::default(),
3459            QueryOptions::default(),
3460        )
3461    }
3462
3463    async fn build_test_table_provider(
3464        table_name_tuples: &[(String, String)],
3465        num_tag: usize,
3466        num_field: usize,
3467    ) -> DfTableSourceProvider {
3468        let catalog_list = MemoryCatalogManager::with_default_setup();
3469        for (schema_name, table_name) in table_name_tuples {
3470            let mut columns = vec![];
3471            for i in 0..num_tag {
3472                columns.push(ColumnSchema::new(
3473                    format!("tag_{i}"),
3474                    ConcreteDataType::string_datatype(),
3475                    false,
3476                ));
3477            }
3478            columns.push(
3479                ColumnSchema::new(
3480                    "timestamp".to_string(),
3481                    ConcreteDataType::timestamp_millisecond_datatype(),
3482                    false,
3483                )
3484                .with_time_index(true),
3485            );
3486            for i in 0..num_field {
3487                columns.push(ColumnSchema::new(
3488                    format!("field_{i}"),
3489                    ConcreteDataType::float64_datatype(),
3490                    true,
3491                ));
3492            }
3493            let schema = Arc::new(Schema::new(columns));
3494            let table_meta = TableMetaBuilder::empty()
3495                .schema(schema)
3496                .primary_key_indices((0..num_tag).collect())
3497                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3498                .next_column_id(1024)
3499                .build()
3500                .unwrap();
3501            let table_info = TableInfoBuilder::default()
3502                .name(table_name.clone())
3503                .meta(table_meta)
3504                .build()
3505                .unwrap();
3506            let table = EmptyTable::from_table_info(&table_info);
3507
3508            assert!(
3509                catalog_list
3510                    .register_table_sync(RegisterTableRequest {
3511                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3512                        schema: schema_name.clone(),
3513                        table_name: table_name.clone(),
3514                        table_id: 1024,
3515                        table,
3516                    })
3517                    .is_ok()
3518            );
3519        }
3520
3521        DfTableSourceProvider::new(
3522            catalog_list,
3523            false,
3524            QueryContext::arc(),
3525            DummyDecoder::arc(),
3526            false,
3527        )
3528    }
3529
3530    async fn build_test_table_provider_with_fields(
3531        table_name_tuples: &[(String, String)],
3532        tags: &[&str],
3533    ) -> DfTableSourceProvider {
3534        let catalog_list = MemoryCatalogManager::with_default_setup();
3535        for (schema_name, table_name) in table_name_tuples {
3536            let mut columns = vec![];
3537            let num_tag = tags.len();
3538            for tag in tags {
3539                columns.push(ColumnSchema::new(
3540                    tag.to_string(),
3541                    ConcreteDataType::string_datatype(),
3542                    false,
3543                ));
3544            }
3545            columns.push(
3546                ColumnSchema::new(
3547                    greptime_timestamp().to_string(),
3548                    ConcreteDataType::timestamp_millisecond_datatype(),
3549                    false,
3550                )
3551                .with_time_index(true),
3552            );
3553            columns.push(ColumnSchema::new(
3554                greptime_value().to_string(),
3555                ConcreteDataType::float64_datatype(),
3556                true,
3557            ));
3558            let schema = Arc::new(Schema::new(columns));
3559            let table_meta = TableMetaBuilder::empty()
3560                .schema(schema)
3561                .primary_key_indices((0..num_tag).collect())
3562                .next_column_id(1024)
3563                .build()
3564                .unwrap();
3565            let table_info = TableInfoBuilder::default()
3566                .name(table_name.clone())
3567                .meta(table_meta)
3568                .build()
3569                .unwrap();
3570            let table = EmptyTable::from_table_info(&table_info);
3571
3572            assert!(
3573                catalog_list
3574                    .register_table_sync(RegisterTableRequest {
3575                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3576                        schema: schema_name.clone(),
3577                        table_name: table_name.clone(),
3578                        table_id: 1024,
3579                        table,
3580                    })
3581                    .is_ok()
3582            );
3583        }
3584
3585        DfTableSourceProvider::new(
3586            catalog_list,
3587            false,
3588            QueryContext::arc(),
3589            DummyDecoder::arc(),
3590            false,
3591        )
3592    }
3593
3594    // {
3595    //     input: `abs(some_metric{foo!="bar"})`,
3596    //     expected: &Call{
3597    //         Func: MustGetFunction("abs"),
3598    //         Args: Expressions{
3599    //             &VectorSelector{
3600    //                 Name: "some_metric",
3601    //                 LabelMatchers: []*labels.Matcher{
3602    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3603    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3604    //                 },
3605    //             },
3606    //         },
3607    //     },
3608    // },
3609    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3610        let prom_expr =
3611            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3612        let eval_stmt = EvalStmt {
3613            expr: prom_expr,
3614            start: UNIX_EPOCH,
3615            end: UNIX_EPOCH
3616                .checked_add(Duration::from_secs(100_000))
3617                .unwrap(),
3618            interval: Duration::from_secs(5),
3619            lookback_delta: Duration::from_secs(1),
3620        };
3621
3622        let table_provider = build_test_table_provider(
3623            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3624            1,
3625            1,
3626        )
3627        .await;
3628        let plan =
3629            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3630                .await
3631                .unwrap();
3632
3633        let expected = String::from(
3634            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3635            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3636            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3637            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3638            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3639            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3640            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3641        ).replace("TEMPLATE", plan_name);
3642
3643        assert_eq!(plan.display_indent_schema().to_string(), expected);
3644    }
3645
3646    #[tokio::test]
3647    async fn single_abs() {
3648        do_single_instant_function_call("abs", "abs").await;
3649    }
3650
3651    #[tokio::test]
3652    #[should_panic]
3653    async fn single_absent() {
3654        do_single_instant_function_call("absent", "").await;
3655    }
3656
3657    #[tokio::test]
3658    async fn single_ceil() {
3659        do_single_instant_function_call("ceil", "ceil").await;
3660    }
3661
3662    #[tokio::test]
3663    async fn single_exp() {
3664        do_single_instant_function_call("exp", "exp").await;
3665    }
3666
3667    #[tokio::test]
3668    async fn single_ln() {
3669        do_single_instant_function_call("ln", "ln").await;
3670    }
3671
3672    #[tokio::test]
3673    async fn single_log2() {
3674        do_single_instant_function_call("log2", "log2").await;
3675    }
3676
3677    #[tokio::test]
3678    async fn single_log10() {
3679        do_single_instant_function_call("log10", "log10").await;
3680    }
3681
3682    #[tokio::test]
3683    #[should_panic]
3684    async fn single_scalar() {
3685        do_single_instant_function_call("scalar", "").await;
3686    }
3687
3688    #[tokio::test]
3689    #[should_panic]
3690    async fn single_sgn() {
3691        do_single_instant_function_call("sgn", "").await;
3692    }
3693
3694    #[tokio::test]
3695    #[should_panic]
3696    async fn single_sort() {
3697        do_single_instant_function_call("sort", "").await;
3698    }
3699
3700    #[tokio::test]
3701    #[should_panic]
3702    async fn single_sort_desc() {
3703        do_single_instant_function_call("sort_desc", "").await;
3704    }
3705
3706    #[tokio::test]
3707    async fn single_sqrt() {
3708        do_single_instant_function_call("sqrt", "sqrt").await;
3709    }
3710
3711    #[tokio::test]
3712    #[should_panic]
3713    async fn single_timestamp() {
3714        do_single_instant_function_call("timestamp", "").await;
3715    }
3716
3717    #[tokio::test]
3718    async fn single_acos() {
3719        do_single_instant_function_call("acos", "acos").await;
3720    }
3721
3722    #[tokio::test]
3723    #[should_panic]
3724    async fn single_acosh() {
3725        do_single_instant_function_call("acosh", "").await;
3726    }
3727
3728    #[tokio::test]
3729    async fn single_asin() {
3730        do_single_instant_function_call("asin", "asin").await;
3731    }
3732
3733    #[tokio::test]
3734    #[should_panic]
3735    async fn single_asinh() {
3736        do_single_instant_function_call("asinh", "").await;
3737    }
3738
3739    #[tokio::test]
3740    async fn single_atan() {
3741        do_single_instant_function_call("atan", "atan").await;
3742    }
3743
3744    #[tokio::test]
3745    #[should_panic]
3746    async fn single_atanh() {
3747        do_single_instant_function_call("atanh", "").await;
3748    }
3749
3750    #[tokio::test]
3751    async fn single_cos() {
3752        do_single_instant_function_call("cos", "cos").await;
3753    }
3754
3755    #[tokio::test]
3756    #[should_panic]
3757    async fn single_cosh() {
3758        do_single_instant_function_call("cosh", "").await;
3759    }
3760
3761    #[tokio::test]
3762    async fn single_sin() {
3763        do_single_instant_function_call("sin", "sin").await;
3764    }
3765
3766    #[tokio::test]
3767    #[should_panic]
3768    async fn single_sinh() {
3769        do_single_instant_function_call("sinh", "").await;
3770    }
3771
3772    #[tokio::test]
3773    async fn single_tan() {
3774        do_single_instant_function_call("tan", "tan").await;
3775    }
3776
3777    #[tokio::test]
3778    #[should_panic]
3779    async fn single_tanh() {
3780        do_single_instant_function_call("tanh", "").await;
3781    }
3782
3783    #[tokio::test]
3784    #[should_panic]
3785    async fn single_deg() {
3786        do_single_instant_function_call("deg", "").await;
3787    }
3788
3789    #[tokio::test]
3790    #[should_panic]
3791    async fn single_rad() {
3792        do_single_instant_function_call("rad", "").await;
3793    }
3794
3795    // {
3796    //     input: "avg by (foo)(some_metric)",
3797    //     expected: &AggregateExpr{
3798    //         Op: AVG,
3799    //         Expr: &VectorSelector{
3800    //             Name: "some_metric",
3801    //             LabelMatchers: []*labels.Matcher{
3802    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3803    //             },
3804    //             PosRange: PositionRange{
3805    //                 Start: 13,
3806    //                 End:   24,
3807    //             },
3808    //         },
3809    //         Grouping: []string{"foo"},
3810    //         PosRange: PositionRange{
3811    //             Start: 0,
3812    //             End:   25,
3813    //         },
3814    //     },
3815    // },
3816    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3817        let prom_expr = parser::parse(&format!(
3818            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3819        ))
3820        .unwrap();
3821        let mut eval_stmt = EvalStmt {
3822            expr: prom_expr,
3823            start: UNIX_EPOCH,
3824            end: UNIX_EPOCH
3825                .checked_add(Duration::from_secs(100_000))
3826                .unwrap(),
3827            interval: Duration::from_secs(5),
3828            lookback_delta: Duration::from_secs(1),
3829        };
3830
3831        // test group by
3832        let table_provider = build_test_table_provider(
3833            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3834            2,
3835            2,
3836        )
3837        .await;
3838        let plan =
3839            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3840                .await
3841                .unwrap();
3842        let expected_no_without = String::from(
3843            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3844            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3845            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3846            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3847            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3848            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3849            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3850        ).replace("TEMPLATE", plan_name);
3851        assert_eq!(
3852            plan.display_indent_schema().to_string(),
3853            expected_no_without
3854        );
3855
3856        // test group without
3857        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3858            *modifier = Some(LabelModifier::Exclude(Labels {
3859                labels: vec![String::from("tag_1")].into_iter().collect(),
3860            }));
3861        }
3862        let table_provider = build_test_table_provider(
3863            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3864            2,
3865            2,
3866        )
3867        .await;
3868        let plan =
3869            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3870                .await
3871                .unwrap();
3872        let expected_without = String::from(
3873            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3874            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3875            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3876            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3877            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3878            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3879            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3880        ).replace("TEMPLATE", plan_name);
3881        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3882    }
3883
3884    #[tokio::test]
3885    async fn aggregate_sum() {
3886        do_aggregate_expr_plan("sum", "sum").await;
3887    }
3888
3889    #[tokio::test]
3890    async fn aggregate_avg() {
3891        do_aggregate_expr_plan("avg", "avg").await;
3892    }
3893
3894    #[tokio::test]
3895    #[should_panic] // output type doesn't match
3896    async fn aggregate_count() {
3897        do_aggregate_expr_plan("count", "count").await;
3898    }
3899
3900    #[tokio::test]
3901    async fn aggregate_min() {
3902        do_aggregate_expr_plan("min", "min").await;
3903    }
3904
3905    #[tokio::test]
3906    async fn aggregate_max() {
3907        do_aggregate_expr_plan("max", "max").await;
3908    }
3909
3910    #[tokio::test]
3911    #[should_panic] // output type doesn't match
3912    async fn aggregate_group() {
3913        do_aggregate_expr_plan("grouping", "GROUPING").await;
3914    }
3915
3916    #[tokio::test]
3917    async fn aggregate_stddev() {
3918        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3919    }
3920
3921    #[tokio::test]
3922    async fn aggregate_stdvar() {
3923        do_aggregate_expr_plan("stdvar", "var_pop").await;
3924    }
3925
3926    // TODO(ruihang): add range fn tests once exprs are ready.
3927
3928    // {
3929    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3930    //     expected: &BinaryExpr{
3931    //         Op: ADD,
3932    //         LHS: &VectorSelector{
3933    //             Name: "a",
3934    //             LabelMatchers: []*labels.Matcher{
3935    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3936    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3937    //             },
3938    //         },
3939    //         RHS: &VectorSelector{
3940    //             Name: "sum",
3941    //             LabelMatchers: []*labels.Matcher{
3942    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3943    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3944    //             },
3945    //         },
3946    //         VectorMatching: &VectorMatching{},
3947    //     },
3948    // },
3949    #[tokio::test]
3950    async fn binary_op_column_column() {
3951        let prom_expr =
3952            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3953        let eval_stmt = EvalStmt {
3954            expr: prom_expr,
3955            start: UNIX_EPOCH,
3956            end: UNIX_EPOCH
3957                .checked_add(Duration::from_secs(100_000))
3958                .unwrap(),
3959            interval: Duration::from_secs(5),
3960            lookback_delta: Duration::from_secs(1),
3961        };
3962
3963        let table_provider = build_test_table_provider(
3964            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3965            1,
3966            1,
3967        )
3968        .await;
3969        let plan =
3970            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3971                .await
3972                .unwrap();
3973
3974        let expected = String::from(
3975            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3976            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3979            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3980            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3981            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3982            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3983            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3984            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3985            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3986            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3987            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3988            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3989        );
3990
3991        assert_eq!(plan.display_indent_schema().to_string(), expected);
3992    }
3993
3994    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3995        let prom_expr = parser::parse(query).unwrap();
3996        let eval_stmt = EvalStmt {
3997            expr: prom_expr,
3998            start: UNIX_EPOCH,
3999            end: UNIX_EPOCH
4000                .checked_add(Duration::from_secs(100_000))
4001                .unwrap(),
4002            interval: Duration::from_secs(5),
4003            lookback_delta: Duration::from_secs(1),
4004        };
4005
4006        let table_provider = build_test_table_provider(
4007            &[
4008                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4009                (
4010                    "greptime_private".to_string(),
4011                    "some_alt_metric".to_string(),
4012                ),
4013            ],
4014            1,
4015            1,
4016        )
4017        .await;
4018        let plan =
4019            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4020                .await
4021                .unwrap();
4022
4023        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
4024    }
4025
4026    #[tokio::test]
4027    async fn binary_op_literal_column() {
4028        let query = r#"1 + some_metric{tag_0="bar"}"#;
4029        let expected = String::from(
4030            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
4031            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4033            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4034            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4035            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4036        );
4037
4038        indie_query_plan_compare(query, expected).await;
4039    }
4040
4041    #[tokio::test]
4042    async fn binary_op_literal_literal() {
4043        let query = r#"1 + 1"#;
4044        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4045  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4046        indie_query_plan_compare(query, expected).await;
4047    }
4048
4049    #[tokio::test]
4050    async fn simple_bool_grammar() {
4051        let query = "some_metric != bool 1.2345";
4052        let expected = String::from(
4053            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
4054            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4059        );
4060
4061        indie_query_plan_compare(query, expected).await;
4062    }
4063
4064    #[tokio::test]
4065    async fn bool_with_additional_arithmetic() {
4066        let query = "some_metric + (1 == bool 2)";
4067        let expected = String::from(
4068            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4069            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4070            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4071            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4072            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4073            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4074        );
4075
4076        indie_query_plan_compare(query, expected).await;
4077    }
4078
4079    #[tokio::test]
4080    async fn simple_unary() {
4081        let query = "-some_metric";
4082        let expected = String::from(
4083            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4084            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4085            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4086            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4087            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4088            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4089        );
4090
4091        indie_query_plan_compare(query, expected).await;
4092    }
4093
4094    #[tokio::test]
4095    async fn increase_aggr() {
4096        let query = "increase(some_metric[5m])";
4097        let expected = String::from(
4098            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4099            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4100            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4101            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4102            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4103            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4104            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4105            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4106        );
4107
4108        indie_query_plan_compare(query, expected).await;
4109    }
4110
4111    #[tokio::test]
4112    async fn less_filter_on_value() {
4113        let query = "some_metric < 1.2345";
4114        let expected = String::from(
4115            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4116            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4117            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4118            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4119            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4120            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4121        );
4122
4123        indie_query_plan_compare(query, expected).await;
4124    }
4125
4126    #[tokio::test]
4127    async fn count_over_time() {
4128        let query = "count_over_time(some_metric[5m])";
4129        let expected = String::from(
4130            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4131            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4132            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4133            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4134            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4135            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4136            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4137            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4138        );
4139
4140        indie_query_plan_compare(query, expected).await;
4141    }
4142
4143    #[tokio::test]
4144    async fn test_hash_join() {
4145        let mut eval_stmt = EvalStmt {
4146            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4147            start: UNIX_EPOCH,
4148            end: UNIX_EPOCH
4149                .checked_add(Duration::from_secs(100_000))
4150                .unwrap(),
4151            interval: Duration::from_secs(5),
4152            lookback_delta: Duration::from_secs(1),
4153        };
4154
4155        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4156
4157        let prom_expr = parser::parse(case).unwrap();
4158        eval_stmt.expr = prom_expr;
4159        let table_provider = build_test_table_provider_with_fields(
4160            &[
4161                (
4162                    DEFAULT_SCHEMA_NAME.to_string(),
4163                    "http_server_requests_seconds_sum".to_string(),
4164                ),
4165                (
4166                    DEFAULT_SCHEMA_NAME.to_string(),
4167                    "http_server_requests_seconds_count".to_string(),
4168                ),
4169            ],
4170            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4171        )
4172        .await;
4173        // Should be ok
4174        let plan =
4175            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4176                .await
4177                .unwrap();
4178        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4179            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4180            \n    SubqueryAlias: http_server_requests_seconds_sum\
4181            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4182            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4183            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4184            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4185            \n              TableScan: http_server_requests_seconds_sum\
4186            \n    SubqueryAlias: http_server_requests_seconds_count\
4187            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4188            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4189            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4190            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4191            \n              TableScan: http_server_requests_seconds_count";
4192        assert_eq!(plan.to_string(), expected);
4193    }
4194
4195    #[tokio::test]
4196    async fn test_nested_histogram_quantile() {
4197        let mut eval_stmt = EvalStmt {
4198            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4199            start: UNIX_EPOCH,
4200            end: UNIX_EPOCH
4201                .checked_add(Duration::from_secs(100_000))
4202                .unwrap(),
4203            interval: Duration::from_secs(5),
4204            lookback_delta: Duration::from_secs(1),
4205        };
4206
4207        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4208
4209        let prom_expr = parser::parse(case).unwrap();
4210        eval_stmt.expr = prom_expr;
4211        let table_provider = build_test_table_provider_with_fields(
4212            &[(
4213                DEFAULT_SCHEMA_NAME.to_string(),
4214                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4215            )],
4216            &["pod", "le", "path", "code", "container"],
4217        )
4218        .await;
4219        // Should be ok
4220        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4221            .await
4222            .unwrap();
4223    }
4224
4225    #[tokio::test]
4226    async fn test_parse_and_operator() {
4227        let mut eval_stmt = EvalStmt {
4228            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4229            start: UNIX_EPOCH,
4230            end: UNIX_EPOCH
4231                .checked_add(Duration::from_secs(100_000))
4232                .unwrap(),
4233            interval: Duration::from_secs(5),
4234            lookback_delta: Duration::from_secs(1),
4235        };
4236
4237        let cases = [
4238            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4239            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4240        ];
4241
4242        for case in cases {
4243            let prom_expr = parser::parse(case).unwrap();
4244            eval_stmt.expr = prom_expr;
4245            let table_provider = build_test_table_provider_with_fields(
4246                &[
4247                    (
4248                        DEFAULT_SCHEMA_NAME.to_string(),
4249                        "kubelet_volume_stats_used_bytes".to_string(),
4250                    ),
4251                    (
4252                        DEFAULT_SCHEMA_NAME.to_string(),
4253                        "kubelet_volume_stats_capacity_bytes".to_string(),
4254                    ),
4255                ],
4256                &["namespace", "persistentvolumeclaim"],
4257            )
4258            .await;
4259            // Should be ok
4260            let _ =
4261                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4262                    .await
4263                    .unwrap();
4264        }
4265    }
4266
4267    #[tokio::test]
4268    async fn test_nested_binary_op() {
4269        let mut eval_stmt = EvalStmt {
4270            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4271            start: UNIX_EPOCH,
4272            end: UNIX_EPOCH
4273                .checked_add(Duration::from_secs(100_000))
4274                .unwrap(),
4275            interval: Duration::from_secs(5),
4276            lookback_delta: Duration::from_secs(1),
4277        };
4278
4279        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4280        (
4281            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4282            or
4283            vector(0)
4284        )"#;
4285
4286        let prom_expr = parser::parse(case).unwrap();
4287        eval_stmt.expr = prom_expr;
4288        let table_provider = build_test_table_provider_with_fields(
4289            &[(
4290                DEFAULT_SCHEMA_NAME.to_string(),
4291                "nginx_ingress_controller_requests".to_string(),
4292            )],
4293            &["namespace", "job"],
4294        )
4295        .await;
4296        // Should be ok
4297        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4298            .await
4299            .unwrap();
4300    }
4301
4302    #[tokio::test]
4303    async fn test_parse_or_operator() {
4304        let mut eval_stmt = EvalStmt {
4305            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4306            start: UNIX_EPOCH,
4307            end: UNIX_EPOCH
4308                .checked_add(Duration::from_secs(100_000))
4309                .unwrap(),
4310            interval: Duration::from_secs(5),
4311            lookback_delta: Duration::from_secs(1),
4312        };
4313
4314        let case = r#"
4315        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4316        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4317            or
4318        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4319        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4320
4321        let table_provider = build_test_table_provider_with_fields(
4322            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4323            &["tenant_name", "cluster_name"],
4324        )
4325        .await;
4326        eval_stmt.expr = parser::parse(case).unwrap();
4327        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4328            .await
4329            .unwrap();
4330
4331        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4332            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4333            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4334            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4335            or
4336            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4337            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4338            or
4339            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4340            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4341        let table_provider = build_test_table_provider_with_fields(
4342            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4343            &["tenant_name", "cluster_name"],
4344        )
4345        .await;
4346        eval_stmt.expr = parser::parse(case).unwrap();
4347        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4348            .await
4349            .unwrap();
4350
4351        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4352            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4353            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4354            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4355        let table_provider = build_test_table_provider_with_fields(
4356            &[
4357                (
4358                    DEFAULT_SCHEMA_NAME.to_string(),
4359                    "background_waitevent_cnt".to_string(),
4360                ),
4361                (
4362                    DEFAULT_SCHEMA_NAME.to_string(),
4363                    "foreground_waitevent_cnt".to_string(),
4364                ),
4365            ],
4366            &["tenant_name", "cluster_name"],
4367        )
4368        .await;
4369        eval_stmt.expr = parser::parse(case).unwrap();
4370        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4371            .await
4372            .unwrap();
4373
4374        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4375        let table_provider = build_test_table_provider_with_fields(
4376            &[
4377                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4378                (
4379                    DEFAULT_SCHEMA_NAME.to_string(),
4380                    "container_cpu_load_average_10s".to_string(),
4381                ),
4382                (
4383                    DEFAULT_SCHEMA_NAME.to_string(),
4384                    "container_spec_cpu_quota".to_string(),
4385                ),
4386            ],
4387            &["cluster_name", "host_name"],
4388        )
4389        .await;
4390        eval_stmt.expr = parser::parse(case).unwrap();
4391        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4392            .await
4393            .unwrap();
4394    }
4395
4396    #[tokio::test]
4397    async fn value_matcher() {
4398        // template
4399        let mut eval_stmt = EvalStmt {
4400            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4401            start: UNIX_EPOCH,
4402            end: UNIX_EPOCH
4403                .checked_add(Duration::from_secs(100_000))
4404                .unwrap(),
4405            interval: Duration::from_secs(5),
4406            lookback_delta: Duration::from_secs(1),
4407        };
4408
4409        let cases = [
4410            // single equal matcher
4411            (
4412                r#"some_metric{__field__="field_1"}"#,
4413                vec![
4414                    "some_metric.field_1",
4415                    "some_metric.tag_0",
4416                    "some_metric.tag_1",
4417                    "some_metric.tag_2",
4418                    "some_metric.timestamp",
4419                ],
4420            ),
4421            // two equal matchers
4422            (
4423                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4424                vec![
4425                    "some_metric.field_0",
4426                    "some_metric.field_1",
4427                    "some_metric.tag_0",
4428                    "some_metric.tag_1",
4429                    "some_metric.tag_2",
4430                    "some_metric.timestamp",
4431                ],
4432            ),
4433            // single not_eq matcher
4434            (
4435                r#"some_metric{__field__!="field_1"}"#,
4436                vec![
4437                    "some_metric.field_0",
4438                    "some_metric.field_2",
4439                    "some_metric.tag_0",
4440                    "some_metric.tag_1",
4441                    "some_metric.tag_2",
4442                    "some_metric.timestamp",
4443                ],
4444            ),
4445            // two not_eq matchers
4446            (
4447                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4448                vec![
4449                    "some_metric.field_0",
4450                    "some_metric.tag_0",
4451                    "some_metric.tag_1",
4452                    "some_metric.tag_2",
4453                    "some_metric.timestamp",
4454                ],
4455            ),
4456            // equal and not_eq matchers (no conflict)
4457            (
4458                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4459                vec![
4460                    "some_metric.field_1",
4461                    "some_metric.tag_0",
4462                    "some_metric.tag_1",
4463                    "some_metric.tag_2",
4464                    "some_metric.timestamp",
4465                ],
4466            ),
4467            // equal and not_eq matchers (conflict)
4468            (
4469                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4470                vec![
4471                    "some_metric.tag_0",
4472                    "some_metric.tag_1",
4473                    "some_metric.tag_2",
4474                    "some_metric.timestamp",
4475                ],
4476            ),
4477            // single regex eq matcher
4478            (
4479                r#"some_metric{__field__=~"field_1|field_2"}"#,
4480                vec![
4481                    "some_metric.field_1",
4482                    "some_metric.field_2",
4483                    "some_metric.tag_0",
4484                    "some_metric.tag_1",
4485                    "some_metric.tag_2",
4486                    "some_metric.timestamp",
4487                ],
4488            ),
4489            // single regex not_eq matcher
4490            (
4491                r#"some_metric{__field__!~"field_1|field_2"}"#,
4492                vec![
4493                    "some_metric.field_0",
4494                    "some_metric.tag_0",
4495                    "some_metric.tag_1",
4496                    "some_metric.tag_2",
4497                    "some_metric.timestamp",
4498                ],
4499            ),
4500        ];
4501
4502        for case in cases {
4503            let prom_expr = parser::parse(case.0).unwrap();
4504            eval_stmt.expr = prom_expr;
4505            let table_provider = build_test_table_provider(
4506                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4507                3,
4508                3,
4509            )
4510            .await;
4511            let plan =
4512                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4513                    .await
4514                    .unwrap();
4515            let mut fields = plan.schema().field_names();
4516            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4517            fields.sort();
4518            expected.sort();
4519            assert_eq!(fields, expected, "case: {:?}", case.0);
4520        }
4521
4522        let bad_cases = [
4523            r#"some_metric{__field__="nonexistent"}"#,
4524            r#"some_metric{__field__!="nonexistent"}"#,
4525        ];
4526
4527        for case in bad_cases {
4528            let prom_expr = parser::parse(case).unwrap();
4529            eval_stmt.expr = prom_expr;
4530            let table_provider = build_test_table_provider(
4531                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4532                3,
4533                3,
4534            )
4535            .await;
4536            let plan =
4537                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4538                    .await;
4539            assert!(plan.is_err(), "case: {:?}", case);
4540        }
4541    }
4542
4543    #[tokio::test]
4544    async fn custom_schema() {
4545        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4546        let expected = String::from(
4547            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4548            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4549            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4550            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4551            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4552        );
4553
4554        indie_query_plan_compare(query, expected).await;
4555
4556        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4557        let expected = String::from(
4558            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4559            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4560            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4561            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4562            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4563        );
4564
4565        indie_query_plan_compare(query, expected).await;
4566
4567        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4568        let expected = String::from(
4569            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4570        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4571        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4572        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4573        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4574        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4575        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4576        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4577        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4578        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4579        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4580        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4581        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4582        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4583        );
4584
4585        indie_query_plan_compare(query, expected).await;
4586    }
4587
4588    #[tokio::test]
4589    async fn only_equals_is_supported_for_special_matcher() {
4590        let queries = &[
4591            "some_alt_metric{__schema__!=\"greptime_private\"}",
4592            "some_alt_metric{__schema__=~\"lalala\"}",
4593            "some_alt_metric{__database__!=\"greptime_private\"}",
4594            "some_alt_metric{__database__=~\"lalala\"}",
4595        ];
4596
4597        for query in queries {
4598            let prom_expr = parser::parse(query).unwrap();
4599            let eval_stmt = EvalStmt {
4600                expr: prom_expr,
4601                start: UNIX_EPOCH,
4602                end: UNIX_EPOCH
4603                    .checked_add(Duration::from_secs(100_000))
4604                    .unwrap(),
4605                interval: Duration::from_secs(5),
4606                lookback_delta: Duration::from_secs(1),
4607            };
4608
4609            let table_provider = build_test_table_provider(
4610                &[
4611                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4612                    (
4613                        "greptime_private".to_string(),
4614                        "some_alt_metric".to_string(),
4615                    ),
4616                ],
4617                1,
4618                1,
4619            )
4620            .await;
4621
4622            let plan =
4623                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4624                    .await;
4625            assert!(plan.is_err(), "query: {:?}", query);
4626        }
4627    }
4628
4629    #[tokio::test]
4630    async fn test_non_ms_precision() {
4631        let catalog_list = MemoryCatalogManager::with_default_setup();
4632        let columns = vec![
4633            ColumnSchema::new(
4634                "tag".to_string(),
4635                ConcreteDataType::string_datatype(),
4636                false,
4637            ),
4638            ColumnSchema::new(
4639                "timestamp".to_string(),
4640                ConcreteDataType::timestamp_nanosecond_datatype(),
4641                false,
4642            )
4643            .with_time_index(true),
4644            ColumnSchema::new(
4645                "field".to_string(),
4646                ConcreteDataType::float64_datatype(),
4647                true,
4648            ),
4649        ];
4650        let schema = Arc::new(Schema::new(columns));
4651        let table_meta = TableMetaBuilder::empty()
4652            .schema(schema)
4653            .primary_key_indices(vec![0])
4654            .value_indices(vec![2])
4655            .next_column_id(1024)
4656            .build()
4657            .unwrap();
4658        let table_info = TableInfoBuilder::default()
4659            .name("metrics".to_string())
4660            .meta(table_meta)
4661            .build()
4662            .unwrap();
4663        let table = EmptyTable::from_table_info(&table_info);
4664        assert!(
4665            catalog_list
4666                .register_table_sync(RegisterTableRequest {
4667                    catalog: DEFAULT_CATALOG_NAME.to_string(),
4668                    schema: DEFAULT_SCHEMA_NAME.to_string(),
4669                    table_name: "metrics".to_string(),
4670                    table_id: 1024,
4671                    table,
4672                })
4673                .is_ok()
4674        );
4675
4676        let plan = PromPlanner::stmt_to_plan(
4677            DfTableSourceProvider::new(
4678                catalog_list.clone(),
4679                false,
4680                QueryContext::arc(),
4681                DummyDecoder::arc(),
4682                true,
4683            ),
4684            &EvalStmt {
4685                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4686                start: UNIX_EPOCH,
4687                end: UNIX_EPOCH
4688                    .checked_add(Duration::from_secs(100_000))
4689                    .unwrap(),
4690                interval: Duration::from_secs(5),
4691                lookback_delta: Duration::from_secs(1),
4692            },
4693            &build_query_engine_state(),
4694        )
4695        .await
4696        .unwrap();
4697        assert_eq!(
4698            plan.display_indent_schema().to_string(),
4699            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4700        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4701        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4702        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4703        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4704        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4705        );
4706        let plan = PromPlanner::stmt_to_plan(
4707            DfTableSourceProvider::new(
4708                catalog_list.clone(),
4709                false,
4710                QueryContext::arc(),
4711                DummyDecoder::arc(),
4712                true,
4713            ),
4714            &EvalStmt {
4715                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4716                start: UNIX_EPOCH,
4717                end: UNIX_EPOCH
4718                    .checked_add(Duration::from_secs(100_000))
4719                    .unwrap(),
4720                interval: Duration::from_secs(5),
4721                lookback_delta: Duration::from_secs(1),
4722            },
4723            &build_query_engine_state(),
4724        )
4725        .await
4726        .unwrap();
4727        assert_eq!(
4728            plan.display_indent_schema().to_string(),
4729            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4730        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4731        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4732        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4733        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4734        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4735        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4736        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4737        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4738        );
4739    }
4740
4741    #[tokio::test]
4742    async fn test_nonexistent_label() {
4743        // template
4744        let mut eval_stmt = EvalStmt {
4745            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4746            start: UNIX_EPOCH,
4747            end: UNIX_EPOCH
4748                .checked_add(Duration::from_secs(100_000))
4749                .unwrap(),
4750            interval: Duration::from_secs(5),
4751            lookback_delta: Duration::from_secs(1),
4752        };
4753
4754        let case = r#"some_metric{nonexistent="hi"}"#;
4755        let prom_expr = parser::parse(case).unwrap();
4756        eval_stmt.expr = prom_expr;
4757        let table_provider = build_test_table_provider(
4758            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4759            3,
4760            3,
4761        )
4762        .await;
4763        // Should be ok
4764        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4765            .await
4766            .unwrap();
4767    }
4768
4769    #[tokio::test]
4770    async fn test_label_join() {
4771        let prom_expr = parser::parse(
4772            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4773        )
4774        .unwrap();
4775        let eval_stmt = EvalStmt {
4776            expr: prom_expr,
4777            start: UNIX_EPOCH,
4778            end: UNIX_EPOCH
4779                .checked_add(Duration::from_secs(100_000))
4780                .unwrap(),
4781            interval: Duration::from_secs(5),
4782            lookback_delta: Duration::from_secs(1),
4783        };
4784
4785        let table_provider =
4786            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4787                .await;
4788        let plan =
4789            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4790                .await
4791                .unwrap();
4792
4793        let expected = r#"
4794Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4795  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4796    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4797      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4798        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4799          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4800            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4801
4802        let ret = plan.display_indent_schema().to_string();
4803        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4804    }
4805
4806    #[tokio::test]
4807    async fn test_label_replace() {
4808        let prom_expr = parser::parse(
4809            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4810        )
4811        .unwrap();
4812        let eval_stmt = EvalStmt {
4813            expr: prom_expr,
4814            start: UNIX_EPOCH,
4815            end: UNIX_EPOCH
4816                .checked_add(Duration::from_secs(100_000))
4817                .unwrap(),
4818            interval: Duration::from_secs(5),
4819            lookback_delta: Duration::from_secs(1),
4820        };
4821
4822        let table_provider =
4823            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4824                .await;
4825        let plan =
4826            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4827                .await
4828                .unwrap();
4829
4830        let expected = r#"
4831Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4832  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4833    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4834      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4835        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4836          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4837            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4838
4839        let ret = plan.display_indent_schema().to_string();
4840        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4841    }
4842
4843    #[tokio::test]
4844    async fn test_matchers_to_expr() {
4845        let mut eval_stmt = EvalStmt {
4846            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4847            start: UNIX_EPOCH,
4848            end: UNIX_EPOCH
4849                .checked_add(Duration::from_secs(100_000))
4850                .unwrap(),
4851            interval: Duration::from_secs(5),
4852            lookback_delta: Duration::from_secs(1),
4853        };
4854        let case =
4855            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4856
4857        let prom_expr = parser::parse(case).unwrap();
4858        eval_stmt.expr = prom_expr;
4859        let table_provider = build_test_table_provider(
4860            &[(
4861                DEFAULT_SCHEMA_NAME.to_string(),
4862                "prometheus_tsdb_head_series".to_string(),
4863            )],
4864            3,
4865            3,
4866        )
4867        .await;
4868        let plan =
4869            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4870                .await
4871                .unwrap();
4872        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4873        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4874        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4875        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4876        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4877        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4878        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4879        assert_eq!(plan.display_indent_schema().to_string(), expected);
4880    }
4881
4882    #[tokio::test]
4883    async fn test_topk_expr() {
4884        let mut eval_stmt = EvalStmt {
4885            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4886            start: UNIX_EPOCH,
4887            end: UNIX_EPOCH
4888                .checked_add(Duration::from_secs(100_000))
4889                .unwrap(),
4890            interval: Duration::from_secs(5),
4891            lookback_delta: Duration::from_secs(1),
4892        };
4893        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4894
4895        let prom_expr = parser::parse(case).unwrap();
4896        eval_stmt.expr = prom_expr;
4897        let table_provider = build_test_table_provider_with_fields(
4898            &[
4899                (
4900                    DEFAULT_SCHEMA_NAME.to_string(),
4901                    "prometheus_tsdb_head_series".to_string(),
4902                ),
4903                (
4904                    DEFAULT_SCHEMA_NAME.to_string(),
4905                    "http_server_requests_seconds_count".to_string(),
4906                ),
4907            ],
4908            &["ip"],
4909        )
4910        .await;
4911
4912        let plan =
4913            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4914                .await
4915                .unwrap();
4916        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4917        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4918        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4919        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4920        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4921        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4922        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4923        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4924        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4925        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4926        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4927
4928        assert_eq!(plan.display_indent_schema().to_string(), expected);
4929    }
4930
4931    #[tokio::test]
4932    async fn test_count_values_expr() {
4933        let mut eval_stmt = EvalStmt {
4934            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4935            start: UNIX_EPOCH,
4936            end: UNIX_EPOCH
4937                .checked_add(Duration::from_secs(100_000))
4938                .unwrap(),
4939            interval: Duration::from_secs(5),
4940            lookback_delta: Duration::from_secs(1),
4941        };
4942        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4943
4944        let prom_expr = parser::parse(case).unwrap();
4945        eval_stmt.expr = prom_expr;
4946        let table_provider = build_test_table_provider_with_fields(
4947            &[
4948                (
4949                    DEFAULT_SCHEMA_NAME.to_string(),
4950                    "prometheus_tsdb_head_series".to_string(),
4951                ),
4952                (
4953                    DEFAULT_SCHEMA_NAME.to_string(),
4954                    "http_server_requests_seconds_count".to_string(),
4955                ),
4956            ],
4957            &["ip"],
4958        )
4959        .await;
4960
4961        let plan =
4962            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4963                .await
4964                .unwrap();
4965        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4966        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4967        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4968        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4969        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4970        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4971        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4972        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4973        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4974
4975        assert_eq!(plan.display_indent_schema().to_string(), expected);
4976    }
4977
4978    #[tokio::test]
4979    async fn test_value_alias() {
4980        let mut eval_stmt = EvalStmt {
4981            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4982            start: UNIX_EPOCH,
4983            end: UNIX_EPOCH
4984                .checked_add(Duration::from_secs(100_000))
4985                .unwrap(),
4986            interval: Duration::from_secs(5),
4987            lookback_delta: Duration::from_secs(1),
4988        };
4989        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4990
4991        let prom_expr = parser::parse(case).unwrap();
4992        eval_stmt.expr = prom_expr;
4993        let table_provider = build_test_table_provider_with_fields(
4994            &[
4995                (
4996                    DEFAULT_SCHEMA_NAME.to_string(),
4997                    "prometheus_tsdb_head_series".to_string(),
4998                ),
4999                (
5000                    DEFAULT_SCHEMA_NAME.to_string(),
5001                    "http_server_requests_seconds_count".to_string(),
5002                ),
5003            ],
5004            &["ip"],
5005        )
5006        .await;
5007
5008        let alias = Some("my_series".to_string());
5009        let plan = PromPlanner::stmt_to_plan_with_alias(
5010            table_provider,
5011            &eval_stmt,
5012            alias,
5013            &build_query_engine_state(),
5014        )
5015        .await
5016        .unwrap();
5017        let expected = r#"
5018Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
5019  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
5020    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5021      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5022        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
5023          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5024            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5025              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5026                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5027                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
5028        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
5029    }
5030
5031    #[tokio::test]
5032    async fn test_quantile_expr() {
5033        let mut eval_stmt = EvalStmt {
5034            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5035            start: UNIX_EPOCH,
5036            end: UNIX_EPOCH
5037                .checked_add(Duration::from_secs(100_000))
5038                .unwrap(),
5039            interval: Duration::from_secs(5),
5040            lookback_delta: Duration::from_secs(1),
5041        };
5042        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5043
5044        let prom_expr = parser::parse(case).unwrap();
5045        eval_stmt.expr = prom_expr;
5046        let table_provider = build_test_table_provider_with_fields(
5047            &[
5048                (
5049                    DEFAULT_SCHEMA_NAME.to_string(),
5050                    "prometheus_tsdb_head_series".to_string(),
5051                ),
5052                (
5053                    DEFAULT_SCHEMA_NAME.to_string(),
5054                    "http_server_requests_seconds_count".to_string(),
5055                ),
5056            ],
5057            &["ip"],
5058        )
5059        .await;
5060
5061        let plan =
5062            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5063                .await
5064                .unwrap();
5065        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5066        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5067        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5068        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5069        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5070        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5071        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5072        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5073        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5074
5075        assert_eq!(plan.display_indent_schema().to_string(), expected);
5076    }
5077
5078    #[tokio::test]
5079    async fn test_or_not_exists_table_label() {
5080        let mut eval_stmt = EvalStmt {
5081            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5082            start: UNIX_EPOCH,
5083            end: UNIX_EPOCH
5084                .checked_add(Duration::from_secs(100_000))
5085                .unwrap(),
5086            interval: Duration::from_secs(5),
5087            lookback_delta: Duration::from_secs(1),
5088        };
5089        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
5090
5091        let prom_expr = parser::parse(case).unwrap();
5092        eval_stmt.expr = prom_expr;
5093        let table_provider = build_test_table_provider_with_fields(
5094            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
5095            &["job"],
5096        )
5097        .await;
5098
5099        let plan =
5100            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5101                .await
5102                .unwrap();
5103        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5104  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5105    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5106      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5107        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5108          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5109            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5110              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5111                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5112                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5113  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5114    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5115      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5116        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5117          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5118            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5119
5120        assert_eq!(plan.display_indent_schema().to_string(), expected);
5121    }
5122
5123    #[tokio::test]
5124    async fn test_histogram_quantile_missing_le_column() {
5125        let mut eval_stmt = EvalStmt {
5126            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5127            start: UNIX_EPOCH,
5128            end: UNIX_EPOCH
5129                .checked_add(Duration::from_secs(100_000))
5130                .unwrap(),
5131            interval: Duration::from_secs(5),
5132            lookback_delta: Duration::from_secs(1),
5133        };
5134
5135        // Test case: histogram_quantile with a table that doesn't have 'le' column
5136        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5137
5138        let prom_expr = parser::parse(case).unwrap();
5139        eval_stmt.expr = prom_expr;
5140
5141        // Create a table provider with a table that doesn't have 'le' column
5142        let table_provider = build_test_table_provider_with_fields(
5143            &[(
5144                DEFAULT_SCHEMA_NAME.to_string(),
5145                "non_existent_histogram_bucket".to_string(),
5146            )],
5147            &["pod", "instance"], // Note: no 'le' column
5148        )
5149        .await;
5150
5151        // Should return empty result instead of error
5152        let result =
5153            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5154                .await;
5155
5156        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
5157        assert!(
5158            result.is_ok(),
5159            "Expected successful plan creation with empty result, but got error: {:?}",
5160            result.err()
5161        );
5162
5163        // Verify that the result is an EmptyRelation
5164        let plan = result.unwrap();
5165        match plan {
5166            LogicalPlan::EmptyRelation(_) => {
5167                // This is what we expect
5168            }
5169            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5170        }
5171    }
5172}