query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::GREPTIME_VALUE;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{col, lit, ExprSchemable, Literal, SortExpr};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56    build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
57    RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
58};
59use promql::functions::{
60    quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
61    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63};
64use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
65use promql_parser::parser::token::TokenType;
66use promql_parser::parser::{
67    token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
68    Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
69    NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
70    VectorMatchCardinality, VectorSelector,
71};
72use regex::{self, Regex};
73use snafu::{ensure, OptionExt, ResultExt};
74use store_api::metric_engine_consts::{
75    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
76};
77use table::table::adapter::DfTableProviderAdapter;
78
79use crate::promql::error::{
80    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
81    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
82    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
83    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
84    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
85    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
86    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
87    ZeroRangeSelectorSnafu,
88};
89use crate::query_engine::QueryEngineState;
90
91/// `time()` function in PromQL.
92const SPECIAL_TIME_FUNCTION: &str = "time";
93/// `scalar()` function in PromQL.
94const SCALAR_FUNCTION: &str = "scalar";
95/// `absent()` function in PromQL
96const SPECIAL_ABSENT_FUNCTION: &str = "absent";
97/// `histogram_quantile` function in PromQL
98const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
99/// `vector` function in PromQL
100const SPECIAL_VECTOR_FUNCTION: &str = "vector";
101/// `le` column for conventional histogram.
102const LE_COLUMN_NAME: &str = "le";
103
104/// Static regex for validating label names according to Prometheus specification.
105/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
106static LABEL_NAME_REGEX: Lazy<Regex> =
107    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
108
109const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
110
111/// default value column name for empty metric
112const DEFAULT_FIELD_COLUMN: &str = "value";
113
114/// Special modifier to project field columns under multi-field mode
115const FIELD_COLUMN_MATCHER: &str = "__field__";
116
117/// Special modifier for cross schema query
118const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
119const DB_COLUMN_MATCHER: &str = "__database__";
120
121/// Threshold for scatter scan mode
122const MAX_SCATTER_POINTS: i64 = 400;
123
124/// Interval 1 hour in millisecond
125const INTERVAL_1H: i64 = 60 * 60 * 1000;
126
127#[derive(Default, Debug, Clone)]
128struct PromPlannerContext {
129    // query parameters
130    start: Millisecond,
131    end: Millisecond,
132    interval: Millisecond,
133    lookback_delta: Millisecond,
134
135    // planner states
136    table_name: Option<String>,
137    time_index_column: Option<String>,
138    field_columns: Vec<String>,
139    tag_columns: Vec<String>,
140    /// The matcher for field columns `__field__`.
141    field_column_matcher: Option<Vec<Matcher>>,
142    /// The matcher for selectors (normal matchers).
143    selector_matcher: Vec<Matcher>,
144    schema_name: Option<String>,
145    /// The range in millisecond of range selector. None if there is no range selector.
146    range: Option<Millisecond>,
147}
148
149impl PromPlannerContext {
150    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
151        Self {
152            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
153            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154            interval: stmt.interval.as_millis() as _,
155            lookback_delta: stmt.lookback_delta.as_millis() as _,
156            ..Default::default()
157        }
158    }
159
160    /// Reset all planner states
161    fn reset(&mut self) {
162        self.table_name = None;
163        self.time_index_column = None;
164        self.field_columns = vec![];
165        self.tag_columns = vec![];
166        self.field_column_matcher = None;
167        self.selector_matcher.clear();
168        self.schema_name = None;
169        self.range = None;
170    }
171
172    /// Reset table name and schema to empty
173    fn reset_table_name_and_schema(&mut self) {
174        self.table_name = Some(String::new());
175        self.schema_name = None;
176    }
177
178    /// Check if `le` is present in tag columns
179    fn has_le_tag(&self) -> bool {
180        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
181    }
182}
183
184pub struct PromPlanner {
185    table_provider: DfTableSourceProvider,
186    ctx: PromPlannerContext,
187}
188
189/// Unescapes the value of the matcher
190pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
191    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
192        matcher.value = unescaped_value;
193    }
194    matcher
195}
196
197impl PromPlanner {
198    pub async fn stmt_to_plan(
199        table_provider: DfTableSourceProvider,
200        stmt: &EvalStmt,
201        query_engine_state: &QueryEngineState,
202    ) -> Result<LogicalPlan> {
203        let mut planner = Self {
204            table_provider,
205            ctx: PromPlannerContext::from_eval_stmt(stmt),
206        };
207
208        planner
209            .prom_expr_to_plan(&stmt.expr, query_engine_state)
210            .await
211    }
212
213    pub async fn prom_expr_to_plan(
214        &mut self,
215        prom_expr: &PromExpr,
216        query_engine_state: &QueryEngineState,
217    ) -> Result<LogicalPlan> {
218        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
219            .await
220    }
221
222    /**
223    Converts a PromQL expression to a logical plan.
224
225    NOTE:
226        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
227        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
228        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
229        If `false`, the planner generates the standard logical plan for the given PromQL expression.
230    */
231    #[async_recursion]
232    async fn prom_expr_to_plan_inner(
233        &mut self,
234        prom_expr: &PromExpr,
235        timestamp_fn: bool,
236        query_engine_state: &QueryEngineState,
237    ) -> Result<LogicalPlan> {
238        let res = match prom_expr {
239            PromExpr::Aggregate(expr) => {
240                self.prom_aggr_expr_to_plan(query_engine_state, expr)
241                    .await?
242            }
243            PromExpr::Unary(expr) => {
244                self.prom_unary_expr_to_plan(query_engine_state, expr)
245                    .await?
246            }
247            PromExpr::Binary(expr) => {
248                self.prom_binary_expr_to_plan(query_engine_state, expr)
249                    .await?
250            }
251            PromExpr::Paren(ParenExpr { expr }) => {
252                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
253                    .await?
254            }
255            PromExpr::Subquery(expr) => {
256                self.prom_subquery_expr_to_plan(query_engine_state, expr)
257                    .await?
258            }
259            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
260            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
261            PromExpr::VectorSelector(selector) => {
262                self.prom_vector_selector_to_plan(selector, timestamp_fn)
263                    .await?
264            }
265            PromExpr::MatrixSelector(selector) => {
266                self.prom_matrix_selector_to_plan(selector).await?
267            }
268            PromExpr::Call(expr) => {
269                self.prom_call_expr_to_plan(query_engine_state, expr)
270                    .await?
271            }
272            PromExpr::Extension(expr) => {
273                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
274            }
275        };
276
277        Ok(res)
278    }
279
280    async fn prom_subquery_expr_to_plan(
281        &mut self,
282        query_engine_state: &QueryEngineState,
283        subquery_expr: &SubqueryExpr,
284    ) -> Result<LogicalPlan> {
285        let SubqueryExpr {
286            expr, range, step, ..
287        } = subquery_expr;
288
289        let current_interval = self.ctx.interval;
290        if let Some(step) = step {
291            self.ctx.interval = step.as_millis() as _;
292        }
293        let current_start = self.ctx.start;
294        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
295        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
296        self.ctx.interval = current_interval;
297        self.ctx.start = current_start;
298
299        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
300        let range_ms = range.as_millis() as _;
301        self.ctx.range = Some(range_ms);
302
303        let manipulate = RangeManipulate::new(
304            self.ctx.start,
305            self.ctx.end,
306            self.ctx.interval,
307            range_ms,
308            self.ctx
309                .time_index_column
310                .clone()
311                .expect("time index should be set in `setup_context`"),
312            self.ctx.field_columns.clone(),
313            input,
314        )
315        .context(DataFusionPlanningSnafu)?;
316
317        Ok(LogicalPlan::Extension(Extension {
318            node: Arc::new(manipulate),
319        }))
320    }
321
322    async fn prom_aggr_expr_to_plan(
323        &mut self,
324        query_engine_state: &QueryEngineState,
325        aggr_expr: &AggregateExpr,
326    ) -> Result<LogicalPlan> {
327        let AggregateExpr {
328            op,
329            expr,
330            modifier,
331            param,
332        } = aggr_expr;
333
334        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
335
336        match (*op).id() {
337            token::T_TOPK | token::T_BOTTOMK => {
338                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
339            }
340            _ => {
341                // calculate columns to group by
342                // Need to append time index column into group by columns
343                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
344                // convert op and value columns to aggregate exprs
345                let (aggr_exprs, prev_field_exprs) =
346                    self.create_aggregate_exprs(*op, param, &input)?;
347
348                // create plan
349                let builder = LogicalPlanBuilder::from(input);
350                let builder = if op.id() == token::T_COUNT_VALUES {
351                    let label = Self::get_param_value_as_str(*op, param)?;
352                    // `count_values` must be grouped by fields,
353                    // and project the fields to the new label.
354                    group_exprs.extend(prev_field_exprs.clone());
355                    let project_fields = self
356                        .create_field_column_exprs()?
357                        .into_iter()
358                        .chain(self.create_tag_column_exprs()?)
359                        .chain(Some(self.create_time_index_column_expr()?))
360                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
361
362                    builder
363                        .aggregate(group_exprs.clone(), aggr_exprs)
364                        .context(DataFusionPlanningSnafu)?
365                        .project(project_fields)
366                        .context(DataFusionPlanningSnafu)?
367                } else {
368                    builder
369                        .aggregate(group_exprs.clone(), aggr_exprs)
370                        .context(DataFusionPlanningSnafu)?
371                };
372
373                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
374
375                builder
376                    .sort(sort_expr)
377                    .context(DataFusionPlanningSnafu)?
378                    .build()
379                    .context(DataFusionPlanningSnafu)
380            }
381        }
382    }
383
384    /// Create logical plan for PromQL topk and bottomk expr.
385    async fn prom_topk_bottomk_to_plan(
386        &mut self,
387        aggr_expr: &AggregateExpr,
388        input: LogicalPlan,
389    ) -> Result<LogicalPlan> {
390        let AggregateExpr {
391            op,
392            param,
393            modifier,
394            ..
395        } = aggr_expr;
396
397        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
398
399        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
400
401        // convert op and value columns to window exprs.
402        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
403
404        let rank_columns: Vec<_> = window_exprs
405            .iter()
406            .map(|expr| expr.schema_name().to_string())
407            .collect();
408
409        // Create ranks filter with `Operator::Or`.
410        // Safety: at least one rank column
411        let filter: DfExpr = rank_columns
412            .iter()
413            .fold(None, |expr, rank| {
414                let predicate = DfExpr::BinaryExpr(BinaryExpr {
415                    left: Box::new(col(rank)),
416                    op: Operator::LtEq,
417                    right: Box::new(val.clone()),
418                });
419
420                match expr {
421                    None => Some(predicate),
422                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
423                        left: Box::new(expr),
424                        op: Operator::Or,
425                        right: Box::new(predicate),
426                    })),
427                }
428            })
429            .unwrap();
430
431        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
432
433        let mut new_group_exprs = group_exprs.clone();
434        // Order by ranks
435        new_group_exprs.extend(rank_columns);
436
437        let group_sort_expr = new_group_exprs
438            .into_iter()
439            .map(|expr| expr.sort(true, false));
440
441        let project_fields = self
442            .create_field_column_exprs()?
443            .into_iter()
444            .chain(self.create_tag_column_exprs()?)
445            .chain(Some(self.create_time_index_column_expr()?));
446
447        LogicalPlanBuilder::from(input)
448            .window(window_exprs)
449            .context(DataFusionPlanningSnafu)?
450            .filter(filter)
451            .context(DataFusionPlanningSnafu)?
452            .sort(group_sort_expr)
453            .context(DataFusionPlanningSnafu)?
454            .project(project_fields)
455            .context(DataFusionPlanningSnafu)?
456            .build()
457            .context(DataFusionPlanningSnafu)
458    }
459
460    async fn prom_unary_expr_to_plan(
461        &mut self,
462        query_engine_state: &QueryEngineState,
463        unary_expr: &UnaryExpr,
464    ) -> Result<LogicalPlan> {
465        let UnaryExpr { expr } = unary_expr;
466        // Unary Expr in PromQL implys the `-` operator
467        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
468        self.projection_for_each_field_column(input, |col| {
469            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
470        })
471    }
472
473    async fn prom_binary_expr_to_plan(
474        &mut self,
475        query_engine_state: &QueryEngineState,
476        binary_expr: &PromBinaryExpr,
477    ) -> Result<LogicalPlan> {
478        let PromBinaryExpr {
479            lhs,
480            rhs,
481            op,
482            modifier,
483        } = binary_expr;
484
485        // if set to true, comparison operator will return 0/1 (for true/false) instead of
486        // filter on the result column
487        let should_return_bool = if let Some(m) = modifier {
488            m.return_bool
489        } else {
490            false
491        };
492        let is_comparison_op = Self::is_token_a_comparison_op(*op);
493
494        // we should build a filter plan here if the op is comparison op and need not
495        // to return 0/1. Otherwise, we should build a projection plan
496        match (
497            Self::try_build_literal_expr(lhs),
498            Self::try_build_literal_expr(rhs),
499        ) {
500            (Some(lhs), Some(rhs)) => {
501                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
502                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
503                self.ctx.reset_table_name_and_schema();
504                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
505                let mut field_expr = field_expr_builder(lhs, rhs)?;
506
507                if is_comparison_op && should_return_bool {
508                    field_expr = DfExpr::Cast(Cast {
509                        expr: Box::new(field_expr),
510                        data_type: ArrowDataType::Float64,
511                    });
512                }
513
514                Ok(LogicalPlan::Extension(Extension {
515                    node: Arc::new(
516                        EmptyMetric::new(
517                            self.ctx.start,
518                            self.ctx.end,
519                            self.ctx.interval,
520                            SPECIAL_TIME_FUNCTION.to_string(),
521                            DEFAULT_FIELD_COLUMN.to_string(),
522                            Some(field_expr),
523                        )
524                        .context(DataFusionPlanningSnafu)?,
525                    ),
526                }))
527            }
528            // lhs is a literal, rhs is a column
529            (Some(mut expr), None) => {
530                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
531                // check if the literal is a special time expr
532                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
533                    expr = time_expr
534                }
535                let bin_expr_builder = |col: &String| {
536                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
537                    let mut binary_expr =
538                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
539
540                    if is_comparison_op && should_return_bool {
541                        binary_expr = DfExpr::Cast(Cast {
542                            expr: Box::new(binary_expr),
543                            data_type: ArrowDataType::Float64,
544                        });
545                    }
546                    Ok(binary_expr)
547                };
548                if is_comparison_op && !should_return_bool {
549                    self.filter_on_field_column(input, bin_expr_builder)
550                } else {
551                    self.projection_for_each_field_column(input, bin_expr_builder)
552                }
553            }
554            // lhs is a column, rhs is a literal
555            (None, Some(mut expr)) => {
556                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
557                // check if the literal is a special time expr
558                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
559                    expr = time_expr
560                }
561                let bin_expr_builder = |col: &String| {
562                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
563                    let mut binary_expr =
564                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
565
566                    if is_comparison_op && should_return_bool {
567                        binary_expr = DfExpr::Cast(Cast {
568                            expr: Box::new(binary_expr),
569                            data_type: ArrowDataType::Float64,
570                        });
571                    }
572                    Ok(binary_expr)
573                };
574                if is_comparison_op && !should_return_bool {
575                    self.filter_on_field_column(input, bin_expr_builder)
576                } else {
577                    self.projection_for_each_field_column(input, bin_expr_builder)
578                }
579            }
580            // both are columns. join them on time index
581            (None, None) => {
582                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
583                let left_field_columns = self.ctx.field_columns.clone();
584                let left_time_index_column = self.ctx.time_index_column.clone();
585                let mut left_table_ref = self
586                    .table_ref()
587                    .unwrap_or_else(|_| TableReference::bare(""));
588                let left_context = self.ctx.clone();
589
590                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
591                let right_field_columns = self.ctx.field_columns.clone();
592                let right_time_index_column = self.ctx.time_index_column.clone();
593                let mut right_table_ref = self
594                    .table_ref()
595                    .unwrap_or_else(|_| TableReference::bare(""));
596                let right_context = self.ctx.clone();
597
598                // TODO(ruihang): avoid join if left and right are the same table
599
600                // set op has "special" join semantics
601                if Self::is_token_a_set_op(*op) {
602                    return self.set_op_on_non_field_columns(
603                        left_input,
604                        right_input,
605                        left_context,
606                        right_context,
607                        *op,
608                        modifier,
609                    );
610                }
611
612                // normal join
613                if left_table_ref == right_table_ref {
614                    // rename table references to avoid ambiguity
615                    left_table_ref = TableReference::bare("lhs");
616                    right_table_ref = TableReference::bare("rhs");
617                    // `self.ctx` have ctx in right plan, if right plan have no tag,
618                    // we use left plan ctx as the ctx for subsequent calculations,
619                    // to avoid case like `host + scalar(...)`
620                    // we need preserve tag column on `host` table in subsequent projection,
621                    // which only show in left plan ctx.
622                    if self.ctx.tag_columns.is_empty() {
623                        self.ctx = left_context.clone();
624                        self.ctx.table_name = Some("lhs".to_string());
625                    } else {
626                        self.ctx.table_name = Some("rhs".to_string());
627                    }
628                }
629                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
630
631                let join_plan = self.join_on_non_field_columns(
632                    left_input,
633                    right_input,
634                    left_table_ref.clone(),
635                    right_table_ref.clone(),
636                    left_time_index_column,
637                    right_time_index_column,
638                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
639                    // under this case we only join on time index
640                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
641                    modifier,
642                )?;
643                let join_plan_schema = join_plan.schema().clone();
644
645                let bin_expr_builder = |_: &String| {
646                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
647                    let left_col = join_plan_schema
648                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
649                        .context(DataFusionPlanningSnafu)?
650                        .into();
651                    let right_col = join_plan_schema
652                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
653                        .context(DataFusionPlanningSnafu)?
654                        .into();
655
656                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
657                    let mut binary_expr =
658                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
659                    if is_comparison_op && should_return_bool {
660                        binary_expr = DfExpr::Cast(Cast {
661                            expr: Box::new(binary_expr),
662                            data_type: ArrowDataType::Float64,
663                        });
664                    }
665                    Ok(binary_expr)
666                };
667                if is_comparison_op && !should_return_bool {
668                    self.filter_on_field_column(join_plan, bin_expr_builder)
669                } else {
670                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
671                }
672            }
673        }
674    }
675
676    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
677        let NumberLiteral { val } = number_literal;
678        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
679        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
680        self.ctx.reset_table_name_and_schema();
681        let literal_expr = df_prelude::lit(*val);
682
683        let plan = LogicalPlan::Extension(Extension {
684            node: Arc::new(
685                EmptyMetric::new(
686                    self.ctx.start,
687                    self.ctx.end,
688                    self.ctx.interval,
689                    SPECIAL_TIME_FUNCTION.to_string(),
690                    DEFAULT_FIELD_COLUMN.to_string(),
691                    Some(literal_expr),
692                )
693                .context(DataFusionPlanningSnafu)?,
694            ),
695        });
696        Ok(plan)
697    }
698
699    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
700        let StringLiteral { val } = string_literal;
701        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
702        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
703        self.ctx.reset_table_name_and_schema();
704        let literal_expr = df_prelude::lit(val.to_string());
705
706        let plan = LogicalPlan::Extension(Extension {
707            node: Arc::new(
708                EmptyMetric::new(
709                    self.ctx.start,
710                    self.ctx.end,
711                    self.ctx.interval,
712                    SPECIAL_TIME_FUNCTION.to_string(),
713                    DEFAULT_FIELD_COLUMN.to_string(),
714                    Some(literal_expr),
715                )
716                .context(DataFusionPlanningSnafu)?,
717            ),
718        });
719        Ok(plan)
720    }
721
722    async fn prom_vector_selector_to_plan(
723        &mut self,
724        vector_selector: &VectorSelector,
725        timestamp_fn: bool,
726    ) -> Result<LogicalPlan> {
727        let VectorSelector {
728            name,
729            offset,
730            matchers,
731            at: _,
732        } = vector_selector;
733        let matchers = self.preprocess_label_matchers(matchers, name)?;
734        if let Some(empty_plan) = self.setup_context().await? {
735            return Ok(empty_plan);
736        }
737        let normalize = self
738            .selector_to_series_normalize_plan(offset, matchers, false)
739            .await?;
740
741        let normalize = if timestamp_fn {
742            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
743            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
744            self.create_timestamp_func_plan(normalize)?
745        } else {
746            normalize
747        };
748
749        let manipulate = InstantManipulate::new(
750            self.ctx.start,
751            self.ctx.end,
752            self.ctx.lookback_delta,
753            self.ctx.interval,
754            self.ctx
755                .time_index_column
756                .clone()
757                .expect("time index should be set in `setup_context`"),
758            self.ctx.field_columns.first().cloned(),
759            normalize,
760        );
761        Ok(LogicalPlan::Extension(Extension {
762            node: Arc::new(manipulate),
763        }))
764    }
765
766    /// Builds a projection plan for the PromQL `timestamp()` function.
767    /// Projects the time index column as the value column for each row.
768    ///
769    /// # Arguments
770    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
771    ///
772    /// # Returns
773    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
774    /// column as the value column, along with the original tag and time index columns.
775    ///
776    /// # Timestamp vs. Time Function
777    ///
778    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
779    ///   timestamp (time index) of each sample as the value column.
780    ///
781    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
782    ///   as a scalar value.
783    ///
784    /// # Side Effects
785    /// Updates the planner context's field columns to the timestamp column name.
786    ///
787    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
788        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
789            .alias(DEFAULT_FIELD_COLUMN);
790        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
791        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
792        project_exprs.push(self.create_time_index_column_expr()?);
793        project_exprs.push(time_expr);
794        project_exprs.extend(self.create_tag_column_exprs()?);
795
796        LogicalPlanBuilder::from(normalize)
797            .project(project_exprs)
798            .context(DataFusionPlanningSnafu)?
799            .build()
800            .context(DataFusionPlanningSnafu)
801    }
802
803    async fn prom_matrix_selector_to_plan(
804        &mut self,
805        matrix_selector: &MatrixSelector,
806    ) -> Result<LogicalPlan> {
807        let MatrixSelector { vs, range } = matrix_selector;
808        let VectorSelector {
809            name,
810            offset,
811            matchers,
812            ..
813        } = vs;
814        let matchers = self.preprocess_label_matchers(matchers, name)?;
815        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
816        let range_ms = range.as_millis() as _;
817        self.ctx.range = Some(range_ms);
818
819        // Some functions like rate may require special fields in the RangeManipulate plan
820        // so we can't skip RangeManipulate.
821        let normalize = match self.setup_context().await? {
822            Some(empty_plan) => empty_plan,
823            None => {
824                self.selector_to_series_normalize_plan(offset, matchers, true)
825                    .await?
826            }
827        };
828        let manipulate = RangeManipulate::new(
829            self.ctx.start,
830            self.ctx.end,
831            self.ctx.interval,
832            // TODO(ruihang): convert via Timestamp datatypes to support different time units
833            range_ms,
834            self.ctx
835                .time_index_column
836                .clone()
837                .expect("time index should be set in `setup_context`"),
838            self.ctx.field_columns.clone(),
839            normalize,
840        )
841        .context(DataFusionPlanningSnafu)?;
842
843        Ok(LogicalPlan::Extension(Extension {
844            node: Arc::new(manipulate),
845        }))
846    }
847
848    async fn prom_call_expr_to_plan(
849        &mut self,
850        query_engine_state: &QueryEngineState,
851        call_expr: &Call,
852    ) -> Result<LogicalPlan> {
853        let Call { func, args } = call_expr;
854        // some special functions that are not expression but a plan
855        match func.name {
856            SPECIAL_HISTOGRAM_QUANTILE => {
857                return self.create_histogram_plan(args, query_engine_state).await
858            }
859            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
860            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
861            SPECIAL_ABSENT_FUNCTION => {
862                return self.create_absent_plan(args, query_engine_state).await
863            }
864            _ => {}
865        }
866
867        // transform function arguments
868        let args = self.create_function_args(&args.args)?;
869        let input = if let Some(prom_expr) = &args.input {
870            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
871                .await?
872        } else {
873            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
874            self.ctx.reset_table_name_and_schema();
875            self.ctx.tag_columns = vec![];
876            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
877            LogicalPlan::Extension(Extension {
878                node: Arc::new(
879                    EmptyMetric::new(
880                        self.ctx.start,
881                        self.ctx.end,
882                        self.ctx.interval,
883                        SPECIAL_TIME_FUNCTION.to_string(),
884                        DEFAULT_FIELD_COLUMN.to_string(),
885                        None,
886                    )
887                    .context(DataFusionPlanningSnafu)?,
888                ),
889            })
890        };
891        let (mut func_exprs, new_tags) =
892            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
893        func_exprs.insert(0, self.create_time_index_column_expr()?);
894        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
895
896        let builder = LogicalPlanBuilder::from(input)
897            .project(func_exprs)
898            .context(DataFusionPlanningSnafu)?
899            .filter(self.create_empty_values_filter_expr()?)
900            .context(DataFusionPlanningSnafu)?;
901
902        let builder = match func.name {
903            "sort" => builder
904                .sort(self.create_field_columns_sort_exprs(true))
905                .context(DataFusionPlanningSnafu)?,
906            "sort_desc" => builder
907                .sort(self.create_field_columns_sort_exprs(false))
908                .context(DataFusionPlanningSnafu)?,
909            "sort_by_label" => builder
910                .sort(Self::create_sort_exprs_by_tags(
911                    func.name,
912                    args.literals,
913                    true,
914                )?)
915                .context(DataFusionPlanningSnafu)?,
916            "sort_by_label_desc" => builder
917                .sort(Self::create_sort_exprs_by_tags(
918                    func.name,
919                    args.literals,
920                    false,
921                )?)
922                .context(DataFusionPlanningSnafu)?,
923
924            _ => builder,
925        };
926
927        // Update context tags after building plan
928        // We can't push them before planning, because they won't exist until projection.
929        for tag in new_tags {
930            self.ctx.tag_columns.push(tag);
931        }
932
933        let plan = builder.build().context(DataFusionPlanningSnafu)?;
934        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
935
936        Ok(plan)
937    }
938
939    async fn prom_ext_expr_to_plan(
940        &mut self,
941        query_engine_state: &QueryEngineState,
942        ext_expr: &promql_parser::parser::ast::Extension,
943    ) -> Result<LogicalPlan> {
944        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
945        let expr = &ext_expr.expr;
946        let children = expr.children();
947        let plan = self
948            .prom_expr_to_plan(&children[0], query_engine_state)
949            .await?;
950        // Wrapper for the explanation/analyze of the existing plan
951        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
952        // if `analyze` is true, runs the actual plan and produces
953        // information about metrics during run.
954        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
955        match expr.name() {
956            "ANALYZE" => LogicalPlanBuilder::from(plan)
957                .explain(false, true)
958                .unwrap()
959                .build()
960                .context(DataFusionPlanningSnafu),
961            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
962                .explain(true, true)
963                .unwrap()
964                .build()
965                .context(DataFusionPlanningSnafu),
966            "EXPLAIN" => LogicalPlanBuilder::from(plan)
967                .explain(false, false)
968                .unwrap()
969                .build()
970                .context(DataFusionPlanningSnafu),
971            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
972                .explain(true, false)
973                .unwrap()
974                .build()
975                .context(DataFusionPlanningSnafu),
976            _ => LogicalPlanBuilder::empty(true)
977                .build()
978                .context(DataFusionPlanningSnafu),
979        }
980    }
981
982    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
983    /// Returns a new [Matchers] that doesn't contain metric name matcher.
984    ///
985    /// Each call to this function means new selector is started. Thus, the context will be reset
986    /// at first.
987    ///
988    /// Name rule:
989    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
990    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
991    #[allow(clippy::mutable_key_type)]
992    fn preprocess_label_matchers(
993        &mut self,
994        label_matchers: &Matchers,
995        name: &Option<String>,
996    ) -> Result<Matchers> {
997        self.ctx.reset();
998
999        let metric_name;
1000        if let Some(name) = name.clone() {
1001            metric_name = Some(name);
1002            ensure!(
1003                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1004                MultipleMetricMatchersSnafu
1005            );
1006        } else {
1007            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1008            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1009            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1010            ensure!(
1011                matches[0].op == MatchOp::Equal,
1012                UnsupportedMatcherOpSnafu {
1013                    matcher_op: matches[0].op.to_string(),
1014                    matcher: METRIC_NAME
1015                }
1016            );
1017            metric_name = matches.pop().map(|m| m.value);
1018        }
1019
1020        self.ctx.table_name = metric_name;
1021
1022        let mut matchers = HashSet::new();
1023        for matcher in &label_matchers.matchers {
1024            // TODO(ruihang): support other metric match ops
1025            if matcher.name == FIELD_COLUMN_MATCHER {
1026                self.ctx
1027                    .field_column_matcher
1028                    .get_or_insert_default()
1029                    .push(matcher.clone());
1030            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1031                ensure!(
1032                    matcher.op == MatchOp::Equal,
1033                    UnsupportedMatcherOpSnafu {
1034                        matcher: matcher.name.to_string(),
1035                        matcher_op: matcher.op.to_string(),
1036                    }
1037                );
1038                self.ctx.schema_name = Some(matcher.value.clone());
1039            } else if matcher.name != METRIC_NAME {
1040                self.ctx.selector_matcher.push(matcher.clone());
1041                let _ = matchers.insert(matcher.clone());
1042            }
1043        }
1044
1045        Ok(Matchers::new(
1046            matchers.into_iter().map(normalize_matcher).collect(),
1047        ))
1048    }
1049
1050    async fn selector_to_series_normalize_plan(
1051        &mut self,
1052        offset: &Option<Offset>,
1053        label_matchers: Matchers,
1054        is_range_selector: bool,
1055    ) -> Result<LogicalPlan> {
1056        // make table scan plan
1057        let table_ref = self.table_ref()?;
1058        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1059        let table_schema = table_scan.schema();
1060
1061        // make filter exprs
1062        let offset_duration = match offset {
1063            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1064            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1065            None => 0,
1066        };
1067        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1068        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1069            scan_filters.push(time_index_filter);
1070        }
1071        table_scan = LogicalPlanBuilder::from(table_scan)
1072            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1073            .context(DataFusionPlanningSnafu)?
1074            .build()
1075            .context(DataFusionPlanningSnafu)?;
1076
1077        // make a projection plan if there is any `__field__` matcher
1078        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1079            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1080            // opt-in set
1081            let mut result_set = HashSet::new();
1082            // opt-out set
1083            let mut reverse_set = HashSet::new();
1084            for matcher in field_matchers {
1085                match &matcher.op {
1086                    MatchOp::Equal => {
1087                        if col_set.contains(&matcher.value) {
1088                            let _ = result_set.insert(matcher.value.clone());
1089                        } else {
1090                            return Err(ColumnNotFoundSnafu {
1091                                col: matcher.value.clone(),
1092                            }
1093                            .build());
1094                        }
1095                    }
1096                    MatchOp::NotEqual => {
1097                        if col_set.contains(&matcher.value) {
1098                            let _ = reverse_set.insert(matcher.value.clone());
1099                        } else {
1100                            return Err(ColumnNotFoundSnafu {
1101                                col: matcher.value.clone(),
1102                            }
1103                            .build());
1104                        }
1105                    }
1106                    MatchOp::Re(regex) => {
1107                        for col in &self.ctx.field_columns {
1108                            if regex.is_match(col) {
1109                                let _ = result_set.insert(col.clone());
1110                            }
1111                        }
1112                    }
1113                    MatchOp::NotRe(regex) => {
1114                        for col in &self.ctx.field_columns {
1115                            if regex.is_match(col) {
1116                                let _ = reverse_set.insert(col.clone());
1117                            }
1118                        }
1119                    }
1120                }
1121            }
1122            // merge two set
1123            if result_set.is_empty() {
1124                result_set = col_set.into_iter().cloned().collect();
1125            }
1126            for col in reverse_set {
1127                let _ = result_set.remove(&col);
1128            }
1129
1130            // mask the field columns in context using computed result set
1131            self.ctx.field_columns = self
1132                .ctx
1133                .field_columns
1134                .drain(..)
1135                .filter(|col| result_set.contains(col))
1136                .collect();
1137
1138            let exprs = result_set
1139                .into_iter()
1140                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1141                .chain(self.create_tag_column_exprs()?)
1142                .chain(Some(self.create_time_index_column_expr()?))
1143                .collect::<Vec<_>>();
1144
1145            // reuse this variable for simplicity
1146            table_scan = LogicalPlanBuilder::from(table_scan)
1147                .project(exprs)
1148                .context(DataFusionPlanningSnafu)?
1149                .build()
1150                .context(DataFusionPlanningSnafu)?;
1151        }
1152
1153        // make sort plan
1154        let sort_plan = LogicalPlanBuilder::from(table_scan)
1155            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1156            .context(DataFusionPlanningSnafu)?
1157            .build()
1158            .context(DataFusionPlanningSnafu)?;
1159
1160        // make divide plan
1161        let time_index_column =
1162            self.ctx
1163                .time_index_column
1164                .clone()
1165                .with_context(|| TimeIndexNotFoundSnafu {
1166                    table: table_ref.to_string(),
1167                })?;
1168        let divide_plan = LogicalPlan::Extension(Extension {
1169            node: Arc::new(SeriesDivide::new(
1170                self.ctx.tag_columns.clone(),
1171                time_index_column,
1172                sort_plan,
1173            )),
1174        });
1175
1176        // make series_normalize plan
1177        if !is_range_selector && offset_duration == 0 {
1178            return Ok(divide_plan);
1179        }
1180        let series_normalize = SeriesNormalize::new(
1181            offset_duration,
1182            self.ctx
1183                .time_index_column
1184                .clone()
1185                .with_context(|| TimeIndexNotFoundSnafu {
1186                    table: table_ref.to_quoted_string(),
1187                })?,
1188            is_range_selector,
1189            self.ctx.tag_columns.clone(),
1190            divide_plan,
1191        );
1192        let logical_plan = LogicalPlan::Extension(Extension {
1193            node: Arc::new(series_normalize),
1194        });
1195
1196        Ok(logical_plan)
1197    }
1198
1199    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1200    /// Timestamp column and tag columns will be included.
1201    ///
1202    /// # Side effect
1203    ///
1204    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1205    fn agg_modifier_to_col(
1206        &mut self,
1207        input_schema: &DFSchemaRef,
1208        modifier: &Option<LabelModifier>,
1209        update_ctx: bool,
1210    ) -> Result<Vec<DfExpr>> {
1211        match modifier {
1212            None => {
1213                if update_ctx {
1214                    self.ctx.tag_columns.clear();
1215                }
1216                Ok(vec![self.create_time_index_column_expr()?])
1217            }
1218            Some(LabelModifier::Include(labels)) => {
1219                if update_ctx {
1220                    self.ctx.tag_columns.clear();
1221                }
1222                let mut exprs = Vec::with_capacity(labels.labels.len());
1223                for label in &labels.labels {
1224                    // nonexistence label will be ignored
1225                    if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1226                        exprs.push(DfExpr::Column(Column::from(field.name())));
1227
1228                        if update_ctx {
1229                            // update the tag columns in context
1230                            self.ctx.tag_columns.push(label.clone());
1231                        }
1232                    }
1233                }
1234                // add timestamp column
1235                exprs.push(self.create_time_index_column_expr()?);
1236
1237                Ok(exprs)
1238            }
1239            Some(LabelModifier::Exclude(labels)) => {
1240                let mut all_fields = input_schema
1241                    .fields()
1242                    .iter()
1243                    .map(|f| f.name())
1244                    .collect::<BTreeSet<_>>();
1245
1246                // remove "without"-ed fields
1247                // nonexistence label will be ignored
1248                for label in &labels.labels {
1249                    let _ = all_fields.remove(label);
1250                }
1251
1252                // remove time index and value fields
1253                if let Some(time_index) = &self.ctx.time_index_column {
1254                    let _ = all_fields.remove(time_index);
1255                }
1256                for value in &self.ctx.field_columns {
1257                    let _ = all_fields.remove(value);
1258                }
1259
1260                if update_ctx {
1261                    // change the tag columns in context
1262                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1263                }
1264
1265                // collect remaining fields and convert to col expr
1266                let mut exprs = all_fields
1267                    .into_iter()
1268                    .map(|c| DfExpr::Column(Column::from(c)))
1269                    .collect::<Vec<_>>();
1270
1271                // add timestamp column
1272                exprs.push(self.create_time_index_column_expr()?);
1273
1274                Ok(exprs)
1275            }
1276        }
1277    }
1278
1279    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1280    pub fn matchers_to_expr(
1281        label_matchers: Matchers,
1282        table_schema: &DFSchemaRef,
1283    ) -> Result<Vec<DfExpr>> {
1284        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1285        for matcher in label_matchers.matchers {
1286            if matcher.name == SCHEMA_COLUMN_MATCHER
1287                || matcher.name == DB_COLUMN_MATCHER
1288                || matcher.name == FIELD_COLUMN_MATCHER
1289            {
1290                continue;
1291            }
1292
1293            let col = if table_schema
1294                .field_with_unqualified_name(&matcher.name)
1295                .is_err()
1296            {
1297                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None).alias(matcher.name)
1298            } else {
1299                DfExpr::Column(Column::from_name(matcher.name))
1300            };
1301            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1302            let expr = match matcher.op {
1303                MatchOp::Equal => col.eq(lit),
1304                MatchOp::NotEqual => col.not_eq(lit),
1305                MatchOp::Re(re) => {
1306                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1307
1308                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1309                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1310                    // most of the time it's fine.
1311                    if re.as_str() == "^(?:.*)$" {
1312                        continue;
1313                    }
1314                    if re.as_str() == "^(?:.+)$" {
1315                        col.not_eq(DfExpr::Literal(
1316                            ScalarValue::Utf8(Some(String::new())),
1317                            None,
1318                        ))
1319                    } else {
1320                        DfExpr::BinaryExpr(BinaryExpr {
1321                            left: Box::new(col),
1322                            op: Operator::RegexMatch,
1323                            right: Box::new(DfExpr::Literal(
1324                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1325                                None,
1326                            )),
1327                        })
1328                    }
1329                }
1330                MatchOp::NotRe(re) => {
1331                    if re.as_str() == "^(?:.*)$" {
1332                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1333                    } else if re.as_str() == "^(?:.+)$" {
1334                        col.eq(DfExpr::Literal(
1335                            ScalarValue::Utf8(Some(String::new())),
1336                            None,
1337                        ))
1338                    } else {
1339                        DfExpr::BinaryExpr(BinaryExpr {
1340                            left: Box::new(col),
1341                            op: Operator::RegexNotMatch,
1342                            right: Box::new(DfExpr::Literal(
1343                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1344                                None,
1345                            )),
1346                        })
1347                    }
1348                }
1349            };
1350            exprs.push(expr);
1351        }
1352
1353        Ok(exprs)
1354    }
1355
1356    fn table_ref(&self) -> Result<TableReference> {
1357        let table_name = self
1358            .ctx
1359            .table_name
1360            .clone()
1361            .context(TableNameNotFoundSnafu)?;
1362
1363        // set schema name if `__schema__` is given
1364        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1365            TableReference::partial(schema_name.as_str(), table_name.as_str())
1366        } else {
1367            TableReference::bare(table_name.as_str())
1368        };
1369
1370        Ok(table_ref)
1371    }
1372
1373    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1374        let start = self.ctx.start;
1375        let end = self.ctx.end;
1376        if end < start {
1377            return InvalidTimeRangeSnafu { start, end }.fail();
1378        }
1379        let lookback_delta = self.ctx.lookback_delta;
1380        let range = self.ctx.range.unwrap_or_default();
1381        let interval = self.ctx.interval;
1382        let time_index_expr = self.create_time_index_column_expr()?;
1383        let num_points = (end - start) / interval;
1384
1385        // Scan a continuous time range
1386        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1387            let single_time_range = time_index_expr
1388                .clone()
1389                .gt_eq(DfExpr::Literal(
1390                    ScalarValue::TimestampMillisecond(
1391                        Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1392                        None,
1393                    ),
1394                    None,
1395                ))
1396                .and(time_index_expr.lt_eq(DfExpr::Literal(
1397                    ScalarValue::TimestampMillisecond(
1398                        Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1399                        None,
1400                    ),
1401                    None,
1402                )));
1403            return Ok(Some(single_time_range));
1404        }
1405
1406        // Otherwise scan scatter ranges separately
1407        let mut filters = Vec::with_capacity(num_points as usize);
1408        for timestamp in (start..end).step_by(interval as usize) {
1409            filters.push(
1410                time_index_expr
1411                    .clone()
1412                    .gt_eq(DfExpr::Literal(
1413                        ScalarValue::TimestampMillisecond(
1414                            Some(timestamp + offset_duration - lookback_delta - range),
1415                            None,
1416                        ),
1417                        None,
1418                    ))
1419                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1420                        ScalarValue::TimestampMillisecond(
1421                            Some(timestamp + offset_duration + lookback_delta),
1422                            None,
1423                        ),
1424                        None,
1425                    ))),
1426            )
1427        }
1428
1429        Ok(filters.into_iter().reduce(DfExpr::or))
1430    }
1431
1432    /// Create a table scan plan and a filter plan with given filter.
1433    ///
1434    /// # Panic
1435    /// If the filter is empty
1436    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1437        let provider = self
1438            .table_provider
1439            .resolve_table(table_ref.clone())
1440            .await
1441            .context(CatalogSnafu)?;
1442
1443        let is_time_index_ms = provider
1444            .as_any()
1445            .downcast_ref::<DefaultTableSource>()
1446            .context(UnknownTableSnafu)?
1447            .table_provider
1448            .as_any()
1449            .downcast_ref::<DfTableProviderAdapter>()
1450            .context(UnknownTableSnafu)?
1451            .table()
1452            .schema()
1453            .timestamp_column()
1454            .with_context(|| TimeIndexNotFoundSnafu {
1455                table: table_ref.to_quoted_string(),
1456            })?
1457            .data_type
1458            == ConcreteDataType::timestamp_millisecond_datatype();
1459
1460        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1461            .context(DataFusionPlanningSnafu)?
1462            .build()
1463            .context(DataFusionPlanningSnafu)?;
1464
1465        if !is_time_index_ms {
1466            // cast to ms if time_index not in Millisecond precision
1467            let expr: Vec<_> = self
1468                .ctx
1469                .field_columns
1470                .iter()
1471                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1472                .chain(self.create_tag_column_exprs()?)
1473                .chain(Some(DfExpr::Alias(Alias {
1474                    expr: Box::new(DfExpr::Cast(Cast {
1475                        expr: Box::new(self.create_time_index_column_expr()?),
1476                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1477                    })),
1478                    relation: Some(table_ref.clone()),
1479                    name: self
1480                        .ctx
1481                        .time_index_column
1482                        .as_ref()
1483                        .with_context(|| TimeIndexNotFoundSnafu {
1484                            table: table_ref.to_quoted_string(),
1485                        })?
1486                        .clone(),
1487                    metadata: None,
1488                })))
1489                .collect::<Vec<_>>();
1490            scan_plan = LogicalPlanBuilder::from(scan_plan)
1491                .project(expr)
1492                .context(DataFusionPlanningSnafu)?
1493                .build()
1494                .context(DataFusionPlanningSnafu)?;
1495        }
1496
1497        let result = LogicalPlanBuilder::from(scan_plan)
1498            .build()
1499            .context(DataFusionPlanningSnafu)?;
1500        Ok(result)
1501    }
1502
1503    /// Setup [PromPlannerContext]'s state fields.
1504    ///
1505    /// Returns a logical plan for an empty metric.
1506    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1507        let table_ref = self.table_ref()?;
1508        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1509            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1510                let plan = self.setup_context_for_empty_metric()?;
1511                return Ok(Some(plan));
1512            }
1513            res => res.context(CatalogSnafu)?,
1514        };
1515        let table = table
1516            .as_any()
1517            .downcast_ref::<DefaultTableSource>()
1518            .context(UnknownTableSnafu)?
1519            .table_provider
1520            .as_any()
1521            .downcast_ref::<DfTableProviderAdapter>()
1522            .context(UnknownTableSnafu)?
1523            .table();
1524
1525        // set time index column name
1526        let time_index = table
1527            .schema()
1528            .timestamp_column()
1529            .with_context(|| TimeIndexNotFoundSnafu {
1530                table: table_ref.to_quoted_string(),
1531            })?
1532            .name
1533            .clone();
1534        self.ctx.time_index_column = Some(time_index);
1535
1536        // set values columns
1537        let values = table
1538            .table_info()
1539            .meta
1540            .field_column_names()
1541            .cloned()
1542            .collect();
1543        self.ctx.field_columns = values;
1544
1545        // set primary key (tag) columns
1546        let tags = table
1547            .table_info()
1548            .meta
1549            .row_key_column_names()
1550            .filter(|col| {
1551                // remove metric engine's internal columns
1552                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1553            })
1554            .cloned()
1555            .collect();
1556        self.ctx.tag_columns = tags;
1557
1558        Ok(None)
1559    }
1560
1561    /// Setup [PromPlannerContext]'s state fields for a non existent table
1562    /// without any rows.
1563    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1564        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1565        self.ctx.reset_table_name_and_schema();
1566        self.ctx.tag_columns = vec![];
1567        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1568
1569        // The table doesn't have any data, so we set start to 0 and end to -1.
1570        let plan = LogicalPlan::Extension(Extension {
1571            node: Arc::new(
1572                EmptyMetric::new(
1573                    0,
1574                    -1,
1575                    self.ctx.interval,
1576                    SPECIAL_TIME_FUNCTION.to_string(),
1577                    DEFAULT_FIELD_COLUMN.to_string(),
1578                    Some(lit(0.0f64)),
1579                )
1580                .context(DataFusionPlanningSnafu)?,
1581            ),
1582        });
1583        Ok(plan)
1584    }
1585
1586    // TODO(ruihang): insert column expr
1587    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1588        let mut result = FunctionArgs::default();
1589
1590        for arg in args {
1591            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
1592            if let Some(expr) = Self::try_build_literal_expr(arg) {
1593                result.literals.push(expr);
1594            } else {
1595                // If not a literal, treat as vector input
1596                match arg.as_ref() {
1597                    PromExpr::Subquery(_)
1598                    | PromExpr::VectorSelector(_)
1599                    | PromExpr::MatrixSelector(_)
1600                    | PromExpr::Extension(_)
1601                    | PromExpr::Aggregate(_)
1602                    | PromExpr::Paren(_)
1603                    | PromExpr::Call(_)
1604                    | PromExpr::Binary(_)
1605                    | PromExpr::Unary(_) => {
1606                        if result.input.replace(*arg.clone()).is_some() {
1607                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1608                        }
1609                    }
1610
1611                    _ => {
1612                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1613                        result.literals.push(expr);
1614                    }
1615                }
1616            }
1617        }
1618
1619        Ok(result)
1620    }
1621
1622    /// Creates function expressions for projection and returns the expressions and new tags.
1623    ///
1624    /// # Side Effects
1625    ///
1626    /// This method will update [PromPlannerContext]'s fields and tags if needed.
1627    fn create_function_expr(
1628        &mut self,
1629        func: &Function,
1630        other_input_exprs: Vec<DfExpr>,
1631        query_engine_state: &QueryEngineState,
1632    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1633        // TODO(ruihang): check function args list
1634        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1635
1636        // TODO(ruihang): set this according to in-param list
1637        let field_column_pos = 0;
1638        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1639        // New labels after executing the function, e.g. `label_replace` etc.
1640        let mut new_tags = vec![];
1641        let scalar_func = match func.name {
1642            "increase" => ScalarFunc::ExtrapolateUdf(
1643                Arc::new(Increase::scalar_udf()),
1644                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1645            ),
1646            "rate" => ScalarFunc::ExtrapolateUdf(
1647                Arc::new(Rate::scalar_udf()),
1648                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1649            ),
1650            "delta" => ScalarFunc::ExtrapolateUdf(
1651                Arc::new(Delta::scalar_udf()),
1652                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1653            ),
1654            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1655            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1656            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1657            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1658            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1659            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1660            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1661            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1662            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1663            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1664            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1665            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1666            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1667            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1668            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1669            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1670            "predict_linear" => {
1671                other_input_exprs[0] = DfExpr::Cast(Cast {
1672                    expr: Box::new(other_input_exprs[0].clone()),
1673                    data_type: ArrowDataType::Int64,
1674                });
1675                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1676            }
1677            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1678            "time" => {
1679                exprs.push(build_special_time_expr(
1680                    self.ctx.time_index_column.as_ref().unwrap(),
1681                ));
1682                ScalarFunc::GeneratedExpr
1683            }
1684            "minute" => {
1685                // date_part('minute', time_index)
1686                let expr = self.date_part_on_time_index("minute")?;
1687                exprs.push(expr);
1688                ScalarFunc::GeneratedExpr
1689            }
1690            "hour" => {
1691                // date_part('hour', time_index)
1692                let expr = self.date_part_on_time_index("hour")?;
1693                exprs.push(expr);
1694                ScalarFunc::GeneratedExpr
1695            }
1696            "month" => {
1697                // date_part('month', time_index)
1698                let expr = self.date_part_on_time_index("month")?;
1699                exprs.push(expr);
1700                ScalarFunc::GeneratedExpr
1701            }
1702            "year" => {
1703                // date_part('year', time_index)
1704                let expr = self.date_part_on_time_index("year")?;
1705                exprs.push(expr);
1706                ScalarFunc::GeneratedExpr
1707            }
1708            "day_of_month" => {
1709                // date_part('day', time_index)
1710                let expr = self.date_part_on_time_index("day")?;
1711                exprs.push(expr);
1712                ScalarFunc::GeneratedExpr
1713            }
1714            "day_of_week" => {
1715                // date_part('dow', time_index)
1716                let expr = self.date_part_on_time_index("dow")?;
1717                exprs.push(expr);
1718                ScalarFunc::GeneratedExpr
1719            }
1720            "day_of_year" => {
1721                // date_part('doy', time_index)
1722                let expr = self.date_part_on_time_index("doy")?;
1723                exprs.push(expr);
1724                ScalarFunc::GeneratedExpr
1725            }
1726            "days_in_month" => {
1727                // date_part(
1728                //     'days',
1729                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1730                // );
1731                let day_lit_expr = "day".lit();
1732                let month_lit_expr = "month".lit();
1733                let interval_1month_lit_expr =
1734                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1735                let interval_1day_lit_expr = DfExpr::Literal(
1736                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1737                    None,
1738                );
1739                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1740                    left: Box::new(interval_1month_lit_expr),
1741                    op: Operator::Minus,
1742                    right: Box::new(interval_1day_lit_expr),
1743                });
1744                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1745                    func: datafusion_functions::datetime::date_trunc(),
1746                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1747                });
1748                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1749                    left: Box::new(date_trunc_expr),
1750                    op: Operator::Plus,
1751                    right: Box::new(the_1month_minus_1day_expr),
1752                });
1753                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1754                    func: datafusion_functions::datetime::date_part(),
1755                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1756                });
1757
1758                exprs.push(date_part_expr);
1759                ScalarFunc::GeneratedExpr
1760            }
1761
1762            "label_join" => {
1763                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1764                    &mut other_input_exprs,
1765                    &self.ctx,
1766                    query_engine_state,
1767                )?;
1768
1769                // Reserve the current field columns except the `dst_label`.
1770                for value in &self.ctx.field_columns {
1771                    if *value != dst_label {
1772                        let expr = DfExpr::Column(Column::from_name(value));
1773                        exprs.push(expr);
1774                    }
1775                }
1776
1777                // Remove it from tag columns if exists to avoid duplicated column names
1778                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1779                new_tags.push(dst_label);
1780                // Add the new label expr to evaluate
1781                exprs.push(concat_expr);
1782
1783                ScalarFunc::GeneratedExpr
1784            }
1785            "label_replace" => {
1786                if let Some((replace_expr, dst_label)) = self
1787                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1788                {
1789                    // Reserve the current field columns except the `dst_label`.
1790                    for value in &self.ctx.field_columns {
1791                        if *value != dst_label {
1792                            let expr = DfExpr::Column(Column::from_name(value));
1793                            exprs.push(expr);
1794                        }
1795                    }
1796
1797                    ensure!(
1798                        !self.ctx.tag_columns.contains(&dst_label),
1799                        SameLabelSetSnafu
1800                    );
1801                    new_tags.push(dst_label);
1802                    // Add the new label expr to evaluate
1803                    exprs.push(replace_expr);
1804                } else {
1805                    // Keep the current field columns
1806                    for value in &self.ctx.field_columns {
1807                        let expr = DfExpr::Column(Column::from_name(value));
1808                        exprs.push(expr);
1809                    }
1810                }
1811
1812                ScalarFunc::GeneratedExpr
1813            }
1814            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1815                // These functions are not expression but a part of plan,
1816                // they are processed by `prom_call_expr_to_plan`.
1817                for value in &self.ctx.field_columns {
1818                    let expr = DfExpr::Column(Column::from_name(value));
1819                    exprs.push(expr);
1820                }
1821
1822                ScalarFunc::GeneratedExpr
1823            }
1824            "round" => {
1825                if other_input_exprs.is_empty() {
1826                    other_input_exprs.push_front(0.0f64.lit());
1827                }
1828                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1829            }
1830            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1831            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1832            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1833            "pi" => {
1834                // pi functions doesn't accepts any arguments, needs special processing
1835                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1836                    func: datafusion::functions::math::pi(),
1837                    args: vec![],
1838                });
1839                exprs.push(fn_expr);
1840
1841                ScalarFunc::GeneratedExpr
1842            }
1843            _ => {
1844                if let Some(f) = query_engine_state
1845                    .session_state()
1846                    .scalar_functions()
1847                    .get(func.name)
1848                {
1849                    ScalarFunc::DataFusionBuiltin(f.clone())
1850                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1851                    let func_state = query_engine_state.function_state();
1852                    let query_ctx = self.table_provider.query_ctx();
1853
1854                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1855                        state: func_state,
1856                        query_ctx: query_ctx.clone(),
1857                    })))
1858                } else if let Some(f) = datafusion_functions::math::functions()
1859                    .iter()
1860                    .find(|f| f.name() == func.name)
1861                {
1862                    ScalarFunc::DataFusionUdf(f.clone())
1863                } else {
1864                    return UnsupportedExprSnafu {
1865                        name: func.name.to_string(),
1866                    }
1867                    .fail();
1868                }
1869            }
1870        };
1871
1872        for value in &self.ctx.field_columns {
1873            let col_expr = DfExpr::Column(Column::from_name(value));
1874
1875            match scalar_func.clone() {
1876                ScalarFunc::DataFusionBuiltin(func) => {
1877                    other_input_exprs.insert(field_column_pos, col_expr);
1878                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1879                        func,
1880                        args: other_input_exprs.clone().into(),
1881                    });
1882                    exprs.push(fn_expr);
1883                    let _ = other_input_exprs.remove(field_column_pos);
1884                }
1885                ScalarFunc::DataFusionUdf(func) => {
1886                    let args = itertools::chain!(
1887                        other_input_exprs.iter().take(field_column_pos).cloned(),
1888                        std::iter::once(col_expr),
1889                        other_input_exprs.iter().skip(field_column_pos).cloned()
1890                    )
1891                    .collect_vec();
1892                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1893                }
1894                ScalarFunc::Udf(func) => {
1895                    let ts_range_expr = DfExpr::Column(Column::from_name(
1896                        RangeManipulate::build_timestamp_range_name(
1897                            self.ctx.time_index_column.as_ref().unwrap(),
1898                        ),
1899                    ));
1900                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1901                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1902                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1903                        func,
1904                        args: other_input_exprs.clone().into(),
1905                    });
1906                    exprs.push(fn_expr);
1907                    let _ = other_input_exprs.remove(field_column_pos + 1);
1908                    let _ = other_input_exprs.remove(field_column_pos);
1909                }
1910                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1911                    let ts_range_expr = DfExpr::Column(Column::from_name(
1912                        RangeManipulate::build_timestamp_range_name(
1913                            self.ctx.time_index_column.as_ref().unwrap(),
1914                        ),
1915                    ));
1916                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1917                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1918                    other_input_exprs
1919                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1920                    other_input_exprs.push_back(lit(range_length));
1921                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1922                        func,
1923                        args: other_input_exprs.clone().into(),
1924                    });
1925                    exprs.push(fn_expr);
1926                    let _ = other_input_exprs.pop_back();
1927                    let _ = other_input_exprs.remove(field_column_pos + 2);
1928                    let _ = other_input_exprs.remove(field_column_pos + 1);
1929                    let _ = other_input_exprs.remove(field_column_pos);
1930                }
1931                ScalarFunc::GeneratedExpr => {}
1932            }
1933        }
1934
1935        // Update value columns' name, and alias them to remove qualifiers
1936        // For label functions such as `label_join`, `label_replace`, etc.,
1937        // we keep the fields unchanged.
1938        if !matches!(func.name, "label_join" | "label_replace") {
1939            let mut new_field_columns = Vec::with_capacity(exprs.len());
1940
1941            exprs = exprs
1942                .into_iter()
1943                .map(|expr| {
1944                    let display_name = expr.schema_name().to_string();
1945                    new_field_columns.push(display_name.clone());
1946                    Ok(expr.alias(display_name))
1947                })
1948                .collect::<std::result::Result<Vec<_>, _>>()
1949                .context(DataFusionPlanningSnafu)?;
1950
1951            self.ctx.field_columns = new_field_columns;
1952        }
1953
1954        Ok((exprs, new_tags))
1955    }
1956
1957    /// Validate label name according to Prometheus specification.
1958    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
1959    /// Additionally, label names starting with double underscores are reserved for internal use.
1960    fn validate_label_name(label_name: &str) -> Result<()> {
1961        // Check if label name starts with double underscores (reserved)
1962        if label_name.starts_with("__") {
1963            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1964        }
1965        // Check if label name matches the required pattern
1966        if !LABEL_NAME_REGEX.is_match(label_name) {
1967            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1968        }
1969
1970        Ok(())
1971    }
1972
1973    /// Build expr for `label_replace` function
1974    fn build_regexp_replace_label_expr(
1975        &self,
1976        other_input_exprs: &mut VecDeque<DfExpr>,
1977        query_engine_state: &QueryEngineState,
1978    ) -> Result<Option<(DfExpr, String)>> {
1979        // label_replace(vector, dst_label, replacement, src_label, regex)
1980        let dst_label = match other_input_exprs.pop_front() {
1981            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1982            other => UnexpectedPlanExprSnafu {
1983                desc: format!("expected dst_label string literal, but found {:?}", other),
1984            }
1985            .fail()?,
1986        };
1987
1988        // Validate the destination label name
1989        Self::validate_label_name(&dst_label)?;
1990        let replacement = match other_input_exprs.pop_front() {
1991            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
1992            other => UnexpectedPlanExprSnafu {
1993                desc: format!("expected replacement string literal, but found {:?}", other),
1994            }
1995            .fail()?,
1996        };
1997        let src_label = match other_input_exprs.pop_front() {
1998            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
1999            other => UnexpectedPlanExprSnafu {
2000                desc: format!("expected src_label string literal, but found {:?}", other),
2001            }
2002            .fail()?,
2003        };
2004
2005        let regex = match other_input_exprs.pop_front() {
2006            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2007            other => UnexpectedPlanExprSnafu {
2008                desc: format!("expected regex string literal, but found {:?}", other),
2009            }
2010            .fail()?,
2011        };
2012
2013        // Validate the regex before using it
2014        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2015        regex::Regex::new(&regex).map_err(|_| {
2016            InvalidRegularExpressionSnafu {
2017                regex: regex.clone(),
2018            }
2019            .build()
2020        })?;
2021
2022        // If the src_label exists and regex is empty, keep everything unchanged.
2023        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2024            return Ok(None);
2025        }
2026
2027        // If the src_label doesn't exists, and
2028        if !self.ctx.tag_columns.contains(&src_label) {
2029            if replacement.is_empty() {
2030                // the replacement is empty, keep everything unchanged.
2031                return Ok(None);
2032            } else {
2033                // the replacement is not empty, always adds dst_label with replacement value.
2034                return Ok(Some((
2035                    // alias literal `replacement` as dst_label
2036                    lit(replacement).alias(&dst_label),
2037                    dst_label,
2038                )));
2039            }
2040        }
2041
2042        // Preprocess the regex:
2043        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2044        let regex = format!("^(?s:{regex})$");
2045
2046        let session_state = query_engine_state.session_state();
2047        let func = session_state
2048            .scalar_functions()
2049            .get("regexp_replace")
2050            .context(UnsupportedExprSnafu {
2051                name: "regexp_replace",
2052            })?;
2053
2054        // regexp_replace(src_label, regex, replacement)
2055        let args = vec![
2056            if src_label.is_empty() {
2057                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2058            } else {
2059                DfExpr::Column(Column::from_name(src_label))
2060            },
2061            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2062            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2063        ];
2064
2065        Ok(Some((
2066            DfExpr::ScalarFunction(ScalarFunction {
2067                func: func.clone(),
2068                args,
2069            })
2070            .alias(&dst_label),
2071            dst_label,
2072        )))
2073    }
2074
2075    /// Build expr for `label_join` function
2076    fn build_concat_labels_expr(
2077        other_input_exprs: &mut VecDeque<DfExpr>,
2078        ctx: &PromPlannerContext,
2079        query_engine_state: &QueryEngineState,
2080    ) -> Result<(DfExpr, String)> {
2081        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2082
2083        let dst_label = match other_input_exprs.pop_front() {
2084            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2085            other => UnexpectedPlanExprSnafu {
2086                desc: format!("expected dst_label string literal, but found {:?}", other),
2087            }
2088            .fail()?,
2089        };
2090        let separator = match other_input_exprs.pop_front() {
2091            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2092            other => UnexpectedPlanExprSnafu {
2093                desc: format!("expected separator string literal, but found {:?}", other),
2094            }
2095            .fail()?,
2096        };
2097
2098        // Create a set of available columns (tag columns + field columns + time index column)
2099        let available_columns: HashSet<&str> = ctx
2100            .tag_columns
2101            .iter()
2102            .chain(ctx.field_columns.iter())
2103            .chain(ctx.time_index_column.as_ref())
2104            .map(|s| s.as_str())
2105            .collect();
2106
2107        let src_labels = other_input_exprs
2108            .iter()
2109            .map(|expr| {
2110                // Cast source label into column or null literal
2111                match expr {
2112                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2113                        if label.is_empty() {
2114                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2115                        } else if available_columns.contains(label.as_str()) {
2116                            // Label exists in the table schema
2117                            Ok(DfExpr::Column(Column::from_name(label)))
2118                        } else {
2119                            // Label doesn't exist, treat as empty string (null)
2120                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2121                        }
2122                    }
2123                    other => UnexpectedPlanExprSnafu {
2124                        desc: format!(
2125                            "expected source label string literal, but found {:?}",
2126                            other
2127                        ),
2128                    }
2129                    .fail(),
2130                }
2131            })
2132            .collect::<Result<Vec<_>>>()?;
2133        ensure!(
2134            !src_labels.is_empty(),
2135            FunctionInvalidArgumentSnafu {
2136                fn_name: "label_join"
2137            }
2138        );
2139
2140        let session_state = query_engine_state.session_state();
2141        let func = session_state
2142            .scalar_functions()
2143            .get("concat_ws")
2144            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2145
2146        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2147        let mut args = Vec::with_capacity(1 + src_labels.len());
2148        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2149        args.extend(src_labels);
2150
2151        Ok((
2152            DfExpr::ScalarFunction(ScalarFunction {
2153                func: func.clone(),
2154                args,
2155            })
2156            .alias(&dst_label),
2157            dst_label,
2158        ))
2159    }
2160
2161    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2162        Ok(DfExpr::Column(Column::from_name(
2163            self.ctx
2164                .time_index_column
2165                .clone()
2166                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2167        )))
2168    }
2169
2170    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2171        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2172        for tag in &self.ctx.tag_columns {
2173            let expr = DfExpr::Column(Column::from_name(tag));
2174            result.push(expr);
2175        }
2176        Ok(result)
2177    }
2178
2179    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2180        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2181        for field in &self.ctx.field_columns {
2182            let expr = DfExpr::Column(Column::from_name(field));
2183            result.push(expr);
2184        }
2185        Ok(result)
2186    }
2187
2188    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2189        let mut result = self
2190            .ctx
2191            .tag_columns
2192            .iter()
2193            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2194            .collect::<Vec<_>>();
2195        result.push(self.create_time_index_column_expr()?.sort(true, true));
2196        Ok(result)
2197    }
2198
2199    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2200        self.ctx
2201            .field_columns
2202            .iter()
2203            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2204            .collect::<Vec<_>>()
2205    }
2206
2207    fn create_sort_exprs_by_tags(
2208        func: &str,
2209        tags: Vec<DfExpr>,
2210        asc: bool,
2211    ) -> Result<Vec<SortExpr>> {
2212        ensure!(
2213            !tags.is_empty(),
2214            FunctionInvalidArgumentSnafu { fn_name: func }
2215        );
2216
2217        tags.iter()
2218            .map(|col| match col {
2219                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2220                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2221                }
2222                other => UnexpectedPlanExprSnafu {
2223                    desc: format!("expected label string literal, but found {:?}", other),
2224                }
2225                .fail(),
2226            })
2227            .collect::<Result<Vec<_>>>()
2228    }
2229
2230    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2231        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2232        for value in &self.ctx.field_columns {
2233            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2234            exprs.push(expr);
2235        }
2236
2237        conjunction(exprs).context(ValueNotFoundSnafu {
2238            table: self.table_ref()?.to_quoted_string(),
2239        })
2240    }
2241
2242    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2243    ///
2244    /// # Side Effects
2245    ///
2246    /// This method modifies the value columns in the context by replacing them with the new columns
2247    /// created by the aggregate function application.
2248    ///
2249    /// # Returns
2250    ///
2251    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2252    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2253    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2254    ///   only when the operation is `count_values`, as this operation requires preserving the original
2255    ///   values for grouping.
2256    ///
2257    fn create_aggregate_exprs(
2258        &mut self,
2259        op: TokenType,
2260        param: &Option<Box<PromExpr>>,
2261        input_plan: &LogicalPlan,
2262    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2263        let mut non_col_args = Vec::new();
2264        let aggr = match op.id() {
2265            token::T_SUM => sum_udaf(),
2266            token::T_QUANTILE => {
2267                let q =
2268                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2269                non_col_args.push(q);
2270                quantile_udaf()
2271            }
2272            token::T_AVG => avg_udaf(),
2273            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2274            token::T_MIN => min_udaf(),
2275            token::T_MAX => max_udaf(),
2276            token::T_GROUP => grouping_udaf(),
2277            token::T_STDDEV => stddev_pop_udaf(),
2278            token::T_STDVAR => var_pop_udaf(),
2279            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2280                name: format!("{op:?}"),
2281            }
2282            .fail()?,
2283            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2284        };
2285
2286        // perform aggregate operation to each value column
2287        let exprs: Vec<DfExpr> = self
2288            .ctx
2289            .field_columns
2290            .iter()
2291            .map(|col| {
2292                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2293                let expr = aggr.call(non_col_args.clone());
2294                non_col_args.pop();
2295                expr
2296            })
2297            .collect::<Vec<_>>();
2298
2299        // if the aggregator is `count_values`, it must be grouped by current fields.
2300        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2301            let prev_field_exprs: Vec<_> = self
2302                .ctx
2303                .field_columns
2304                .iter()
2305                .map(|col| DfExpr::Column(Column::from_name(col)))
2306                .collect();
2307
2308            ensure!(
2309                self.ctx.field_columns.len() == 1,
2310                UnsupportedExprSnafu {
2311                    name: "count_values on multi-value input"
2312                }
2313            );
2314
2315            prev_field_exprs
2316        } else {
2317            vec![]
2318        };
2319
2320        // update value column name according to the aggregators,
2321        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2322
2323        let normalized_exprs =
2324            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2325        for expr in normalized_exprs {
2326            new_field_columns.push(expr.schema_name().to_string());
2327        }
2328        self.ctx.field_columns = new_field_columns;
2329
2330        Ok((exprs, prev_field_exprs))
2331    }
2332
2333    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2334        let param = param
2335            .as_deref()
2336            .with_context(|| FunctionInvalidArgumentSnafu {
2337                fn_name: op.to_string(),
2338            })?;
2339        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2340            return FunctionInvalidArgumentSnafu {
2341                fn_name: op.to_string(),
2342            }
2343            .fail();
2344        };
2345
2346        Ok(val)
2347    }
2348
2349    fn get_param_as_literal_expr(
2350        param: &Option<Box<PromExpr>>,
2351        op: Option<TokenType>,
2352        expected_type: Option<ArrowDataType>,
2353    ) -> Result<DfExpr> {
2354        let prom_param = param.as_deref().with_context(|| {
2355            if let Some(op) = op {
2356                FunctionInvalidArgumentSnafu {
2357                    fn_name: op.to_string(),
2358                }
2359            } else {
2360                FunctionInvalidArgumentSnafu {
2361                    fn_name: "unknown".to_string(),
2362                }
2363            }
2364        })?;
2365
2366        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2367            if let Some(op) = op {
2368                FunctionInvalidArgumentSnafu {
2369                    fn_name: op.to_string(),
2370                }
2371            } else {
2372                FunctionInvalidArgumentSnafu {
2373                    fn_name: "unknown".to_string(),
2374                }
2375            }
2376        })?;
2377
2378        // check if the type is expected
2379        if let Some(expected_type) = expected_type {
2380            // literal should not have reference to column
2381            let expr_type = expr
2382                .get_type(&DFSchema::empty())
2383                .context(DataFusionPlanningSnafu)?;
2384            if expected_type != expr_type {
2385                return FunctionInvalidArgumentSnafu {
2386                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2387                }
2388                .fail();
2389            }
2390        }
2391
2392        Ok(expr)
2393    }
2394
2395    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2396    ///
2397    fn create_window_exprs(
2398        &mut self,
2399        op: TokenType,
2400        group_exprs: Vec<DfExpr>,
2401        input_plan: &LogicalPlan,
2402    ) -> Result<Vec<DfExpr>> {
2403        ensure!(
2404            self.ctx.field_columns.len() == 1,
2405            UnsupportedExprSnafu {
2406                name: "topk or bottomk on multi-value input"
2407            }
2408        );
2409
2410        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2411
2412        let asc = matches!(op.id(), token::T_BOTTOMK);
2413
2414        let tag_sort_exprs = self
2415            .create_tag_column_exprs()?
2416            .into_iter()
2417            .map(|expr| expr.sort(asc, true));
2418
2419        // perform window operation to each value column
2420        let exprs: Vec<DfExpr> = self
2421            .ctx
2422            .field_columns
2423            .iter()
2424            .map(|col| {
2425                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2426                // Order by value in the specific order
2427                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2428                // Then tags if the values are equal,
2429                // Try to ensure the relative stability of the output results.
2430                sort_exprs.extend(tag_sort_exprs.clone());
2431
2432                DfExpr::WindowFunction(Box::new(WindowFunction {
2433                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2434                    params: WindowFunctionParams {
2435                        args: vec![],
2436                        partition_by: group_exprs.clone(),
2437                        order_by: sort_exprs,
2438                        window_frame: WindowFrame::new(Some(true)),
2439                        null_treatment: None,
2440                        distinct: false,
2441                    },
2442                }))
2443            })
2444            .collect();
2445
2446        let normalized_exprs =
2447            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2448        Ok(normalized_exprs)
2449    }
2450
2451    /// Try to build a [f64] from [PromExpr].
2452    #[deprecated(
2453        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2454    )]
2455    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2456        match expr {
2457            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2458            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2459            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2460                Self::try_build_float_literal(expr).map(|f| -f)
2461            }
2462            PromExpr::StringLiteral(_)
2463            | PromExpr::Binary(_)
2464            | PromExpr::VectorSelector(_)
2465            | PromExpr::MatrixSelector(_)
2466            | PromExpr::Call(_)
2467            | PromExpr::Extension(_)
2468            | PromExpr::Aggregate(_)
2469            | PromExpr::Subquery(_) => None,
2470        }
2471    }
2472
2473    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2474    async fn create_histogram_plan(
2475        &mut self,
2476        args: &PromFunctionArgs,
2477        query_engine_state: &QueryEngineState,
2478    ) -> Result<LogicalPlan> {
2479        if args.args.len() != 2 {
2480            return FunctionInvalidArgumentSnafu {
2481                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2482            }
2483            .fail();
2484        }
2485        #[allow(deprecated)]
2486        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2487            FunctionInvalidArgumentSnafu {
2488                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2489            }
2490        })?;
2491
2492        let input = args.args[1].as_ref().clone();
2493        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2494
2495        if !self.ctx.has_le_tag() {
2496            // Return empty result instead of error when 'le' column is not found
2497            // This handles the case when histogram metrics don't exist
2498            return Ok(LogicalPlan::EmptyRelation(
2499                datafusion::logical_expr::EmptyRelation {
2500                    produce_one_row: false,
2501                    schema: Arc::new(DFSchema::empty()),
2502                },
2503            ));
2504        }
2505        let time_index_column =
2506            self.ctx
2507                .time_index_column
2508                .clone()
2509                .with_context(|| TimeIndexNotFoundSnafu {
2510                    table: self.ctx.table_name.clone().unwrap_or_default(),
2511                })?;
2512        // FIXME(ruihang): support multi fields
2513        let field_column = self
2514            .ctx
2515            .field_columns
2516            .first()
2517            .with_context(|| FunctionInvalidArgumentSnafu {
2518                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2519            })?
2520            .clone();
2521        // remove le column from tag columns
2522        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2523
2524        Ok(LogicalPlan::Extension(Extension {
2525            node: Arc::new(
2526                HistogramFold::new(
2527                    LE_COLUMN_NAME.to_string(),
2528                    field_column,
2529                    time_index_column,
2530                    phi,
2531                    input_plan,
2532                )
2533                .context(DataFusionPlanningSnafu)?,
2534            ),
2535        }))
2536    }
2537
2538    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2539    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2540        if args.args.len() != 1 {
2541            return FunctionInvalidArgumentSnafu {
2542                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2543            }
2544            .fail();
2545        }
2546        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2547
2548        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2549        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2550        self.ctx.reset_table_name_and_schema();
2551        self.ctx.tag_columns = vec![];
2552        self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2553        Ok(LogicalPlan::Extension(Extension {
2554            node: Arc::new(
2555                EmptyMetric::new(
2556                    self.ctx.start,
2557                    self.ctx.end,
2558                    self.ctx.interval,
2559                    SPECIAL_TIME_FUNCTION.to_string(),
2560                    GREPTIME_VALUE.to_string(),
2561                    Some(lit),
2562                )
2563                .context(DataFusionPlanningSnafu)?,
2564            ),
2565        }))
2566    }
2567
2568    /// Create a [SCALAR_FUNCTION] plan
2569    async fn create_scalar_plan(
2570        &mut self,
2571        args: &PromFunctionArgs,
2572        query_engine_state: &QueryEngineState,
2573    ) -> Result<LogicalPlan> {
2574        ensure!(
2575            args.len() == 1,
2576            FunctionInvalidArgumentSnafu {
2577                fn_name: SCALAR_FUNCTION
2578            }
2579        );
2580        let input = self
2581            .prom_expr_to_plan(&args.args[0], query_engine_state)
2582            .await?;
2583        ensure!(
2584            self.ctx.field_columns.len() == 1,
2585            MultiFieldsNotSupportedSnafu {
2586                operator: SCALAR_FUNCTION
2587            },
2588        );
2589        let scalar_plan = LogicalPlan::Extension(Extension {
2590            node: Arc::new(
2591                ScalarCalculate::new(
2592                    self.ctx.start,
2593                    self.ctx.end,
2594                    self.ctx.interval,
2595                    input,
2596                    self.ctx.time_index_column.as_ref().unwrap(),
2597                    &self.ctx.tag_columns,
2598                    &self.ctx.field_columns[0],
2599                    self.ctx.table_name.as_deref(),
2600                )
2601                .context(PromqlPlanNodeSnafu)?,
2602            ),
2603        });
2604        // scalar plan have no tag columns
2605        self.ctx.tag_columns.clear();
2606        self.ctx.field_columns.clear();
2607        self.ctx
2608            .field_columns
2609            .push(scalar_plan.schema().field(1).name().clone());
2610        Ok(scalar_plan)
2611    }
2612
2613    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
2614    async fn create_absent_plan(
2615        &mut self,
2616        args: &PromFunctionArgs,
2617        query_engine_state: &QueryEngineState,
2618    ) -> Result<LogicalPlan> {
2619        if args.args.len() != 1 {
2620            return FunctionInvalidArgumentSnafu {
2621                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2622            }
2623            .fail();
2624        }
2625        let input = self
2626            .prom_expr_to_plan(&args.args[0], query_engine_state)
2627            .await?;
2628
2629        let time_index_expr = self.create_time_index_column_expr()?;
2630        let first_field_expr =
2631            self.create_field_column_exprs()?
2632                .pop()
2633                .with_context(|| ValueNotFoundSnafu {
2634                    table: self.ctx.table_name.clone().unwrap_or_default(),
2635                })?;
2636        let first_value_expr = first_value(first_field_expr, vec![]);
2637
2638        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2639            .aggregate(
2640                vec![time_index_expr.clone()],
2641                vec![first_value_expr.clone()],
2642            )
2643            .context(DataFusionPlanningSnafu)?
2644            .sort(vec![time_index_expr.sort(true, false)])
2645            .context(DataFusionPlanningSnafu)?
2646            .build()
2647            .context(DataFusionPlanningSnafu)?;
2648
2649        let fake_labels = self
2650            .ctx
2651            .selector_matcher
2652            .iter()
2653            .filter_map(|matcher| match matcher.op {
2654                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2655                _ => None,
2656            })
2657            .collect::<Vec<_>>();
2658
2659        // Create the absent plan
2660        let absent_plan = LogicalPlan::Extension(Extension {
2661            node: Arc::new(
2662                Absent::try_new(
2663                    self.ctx.start,
2664                    self.ctx.end,
2665                    self.ctx.interval,
2666                    self.ctx.time_index_column.as_ref().unwrap().clone(),
2667                    self.ctx.field_columns[0].clone(),
2668                    fake_labels,
2669                    ordered_aggregated_input,
2670                )
2671                .context(DataFusionPlanningSnafu)?,
2672            ),
2673        });
2674
2675        Ok(absent_plan)
2676    }
2677
2678    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2679    /// `None` if the input is not a literal expression.
2680    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2681        match expr {
2682            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2683            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2684            PromExpr::VectorSelector(_)
2685            | PromExpr::MatrixSelector(_)
2686            | PromExpr::Extension(_)
2687            | PromExpr::Aggregate(_)
2688            | PromExpr::Subquery(_) => None,
2689            PromExpr::Call(Call { func, .. }) => {
2690                if func.name == SPECIAL_TIME_FUNCTION {
2691                    // For time() function, don't treat it as a literal
2692                    // Let it be handled as a regular function call
2693                    None
2694                } else {
2695                    None
2696                }
2697            }
2698            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2699            // TODO(ruihang): support Unary operator
2700            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2701            PromExpr::Binary(PromBinaryExpr {
2702                lhs,
2703                rhs,
2704                op,
2705                modifier,
2706            }) => {
2707                let lhs = Self::try_build_literal_expr(lhs)?;
2708                let rhs = Self::try_build_literal_expr(rhs)?;
2709                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2710                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2711                let expr = expr_builder(lhs, rhs).ok()?;
2712
2713                let should_return_bool = if let Some(m) = modifier {
2714                    m.return_bool
2715                } else {
2716                    false
2717                };
2718                if is_comparison_op && should_return_bool {
2719                    Some(DfExpr::Cast(Cast {
2720                        expr: Box::new(expr),
2721                        data_type: ArrowDataType::Float64,
2722                    }))
2723                } else {
2724                    Some(expr)
2725                }
2726            }
2727        }
2728    }
2729
2730    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2731        match expr {
2732            PromExpr::Call(Call { func, .. }) => {
2733                if func.name == SPECIAL_TIME_FUNCTION
2734                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2735                {
2736                    Some(build_special_time_expr(time_index_col))
2737                } else {
2738                    None
2739                }
2740            }
2741            _ => None,
2742        }
2743    }
2744
2745    /// Return a lambda to build binary expression from token.
2746    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2747    #[allow(clippy::type_complexity)]
2748    fn prom_token_to_binary_expr_builder(
2749        token: TokenType,
2750    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2751        match token.id() {
2752            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2753            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2754            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2755            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2756            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2757            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2758            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2759            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2760            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2761            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2762            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2763            token::T_POW => Ok(Box::new(|lhs, rhs| {
2764                Ok(DfExpr::ScalarFunction(ScalarFunction {
2765                    func: datafusion_functions::math::power(),
2766                    args: vec![lhs, rhs],
2767                }))
2768            })),
2769            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2770                Ok(DfExpr::ScalarFunction(ScalarFunction {
2771                    func: datafusion_functions::math::atan2(),
2772                    args: vec![lhs, rhs],
2773                }))
2774            })),
2775            _ => UnexpectedTokenSnafu { token }.fail(),
2776        }
2777    }
2778
2779    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2780    fn is_token_a_comparison_op(token: TokenType) -> bool {
2781        matches!(
2782            token.id(),
2783            token::T_EQLC
2784                | token::T_NEQ
2785                | token::T_GTR
2786                | token::T_LSS
2787                | token::T_GTE
2788                | token::T_LTE
2789        )
2790    }
2791
2792    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2793    fn is_token_a_set_op(token: TokenType) -> bool {
2794        matches!(
2795            token.id(),
2796            token::T_LAND // INTERSECT
2797                | token::T_LOR // UNION
2798                | token::T_LUNLESS // EXCEPT
2799        )
2800    }
2801
2802    /// Build a inner join on time index column and tag columns to concat two logical plans.
2803    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2804    #[allow(clippy::too_many_arguments)]
2805    fn join_on_non_field_columns(
2806        &self,
2807        left: LogicalPlan,
2808        right: LogicalPlan,
2809        left_table_ref: TableReference,
2810        right_table_ref: TableReference,
2811        left_time_index_column: Option<String>,
2812        right_time_index_column: Option<String>,
2813        only_join_time_index: bool,
2814        modifier: &Option<BinModifier>,
2815    ) -> Result<LogicalPlan> {
2816        let mut left_tag_columns = if only_join_time_index {
2817            BTreeSet::new()
2818        } else {
2819            self.ctx
2820                .tag_columns
2821                .iter()
2822                .cloned()
2823                .collect::<BTreeSet<_>>()
2824        };
2825        let mut right_tag_columns = left_tag_columns.clone();
2826
2827        // apply modifier
2828        if let Some(modifier) = modifier {
2829            // apply label modifier
2830            if let Some(matching) = &modifier.matching {
2831                match matching {
2832                    // keeps columns mentioned in `on`
2833                    LabelModifier::Include(on) => {
2834                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2835                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2836                        right_tag_columns =
2837                            right_tag_columns.intersection(&mask).cloned().collect();
2838                    }
2839                    // removes columns memtioned in `ignoring`
2840                    LabelModifier::Exclude(ignoring) => {
2841                        // doesn't check existence of label
2842                        for label in &ignoring.labels {
2843                            let _ = left_tag_columns.remove(label);
2844                            let _ = right_tag_columns.remove(label);
2845                        }
2846                    }
2847                }
2848            }
2849        }
2850
2851        // push time index column if it exists
2852        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2853            (left_time_index_column, right_time_index_column)
2854        {
2855            left_tag_columns.insert(left_time_index_column);
2856            right_tag_columns.insert(right_time_index_column);
2857        }
2858
2859        let right = LogicalPlanBuilder::from(right)
2860            .alias(right_table_ref)
2861            .context(DataFusionPlanningSnafu)?
2862            .build()
2863            .context(DataFusionPlanningSnafu)?;
2864
2865        // Inner Join on time index column to concat two operator
2866        LogicalPlanBuilder::from(left)
2867            .alias(left_table_ref)
2868            .context(DataFusionPlanningSnafu)?
2869            .join_detailed(
2870                right,
2871                JoinType::Inner,
2872                (
2873                    left_tag_columns
2874                        .into_iter()
2875                        .map(Column::from_name)
2876                        .collect::<Vec<_>>(),
2877                    right_tag_columns
2878                        .into_iter()
2879                        .map(Column::from_name)
2880                        .collect::<Vec<_>>(),
2881                ),
2882                None,
2883                NullEquality::NullEqualsNull,
2884            )
2885            .context(DataFusionPlanningSnafu)?
2886            .build()
2887            .context(DataFusionPlanningSnafu)
2888    }
2889
2890    /// Build a set operator (AND/OR/UNLESS)
2891    fn set_op_on_non_field_columns(
2892        &mut self,
2893        left: LogicalPlan,
2894        mut right: LogicalPlan,
2895        left_context: PromPlannerContext,
2896        right_context: PromPlannerContext,
2897        op: TokenType,
2898        modifier: &Option<BinModifier>,
2899    ) -> Result<LogicalPlan> {
2900        let mut left_tag_col_set = left_context
2901            .tag_columns
2902            .iter()
2903            .cloned()
2904            .collect::<HashSet<_>>();
2905        let mut right_tag_col_set = right_context
2906            .tag_columns
2907            .iter()
2908            .cloned()
2909            .collect::<HashSet<_>>();
2910
2911        if matches!(op.id(), token::T_LOR) {
2912            return self.or_operator(
2913                left,
2914                right,
2915                left_tag_col_set,
2916                right_tag_col_set,
2917                left_context,
2918                right_context,
2919                modifier,
2920            );
2921        }
2922
2923        // apply modifier
2924        if let Some(modifier) = modifier {
2925            // one-to-many and many-to-one are not supported
2926            ensure!(
2927                matches!(
2928                    modifier.card,
2929                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2930                ),
2931                UnsupportedVectorMatchSnafu {
2932                    name: modifier.card.clone(),
2933                },
2934            );
2935            // apply label modifier
2936            if let Some(matching) = &modifier.matching {
2937                match matching {
2938                    // keeps columns mentioned in `on`
2939                    LabelModifier::Include(on) => {
2940                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2941                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2942                        right_tag_col_set =
2943                            right_tag_col_set.intersection(&mask).cloned().collect();
2944                    }
2945                    // removes columns memtioned in `ignoring`
2946                    LabelModifier::Exclude(ignoring) => {
2947                        // doesn't check existence of label
2948                        for label in &ignoring.labels {
2949                            let _ = left_tag_col_set.remove(label);
2950                            let _ = right_tag_col_set.remove(label);
2951                        }
2952                    }
2953                }
2954            }
2955        }
2956        // ensure two sides have the same tag columns
2957        if !matches!(op.id(), token::T_LOR) {
2958            ensure!(
2959                left_tag_col_set == right_tag_col_set,
2960                CombineTableColumnMismatchSnafu {
2961                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2962                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2963                }
2964            )
2965        };
2966        let left_time_index = left_context.time_index_column.clone().unwrap();
2967        let right_time_index = right_context.time_index_column.clone().unwrap();
2968        let join_keys = left_tag_col_set
2969            .iter()
2970            .cloned()
2971            .chain([left_time_index.clone()])
2972            .collect::<Vec<_>>();
2973        self.ctx.time_index_column = Some(left_time_index.clone());
2974
2975        // alias right time index column if necessary
2976        if left_context.time_index_column != right_context.time_index_column {
2977            let right_project_exprs = right
2978                .schema()
2979                .fields()
2980                .iter()
2981                .map(|field| {
2982                    if field.name() == &right_time_index {
2983                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2984                    } else {
2985                        DfExpr::Column(Column::from_name(field.name()))
2986                    }
2987                })
2988                .collect::<Vec<_>>();
2989
2990            right = LogicalPlanBuilder::from(right)
2991                .project(right_project_exprs)
2992                .context(DataFusionPlanningSnafu)?
2993                .build()
2994                .context(DataFusionPlanningSnafu)?;
2995        }
2996
2997        ensure!(
2998            left_context.field_columns.len() == 1,
2999            MultiFieldsNotSupportedSnafu {
3000                operator: "AND operator"
3001            }
3002        );
3003        // Update the field column in context.
3004        // The AND/UNLESS operator only keep the field column in left input.
3005        let left_field_col = left_context.field_columns.first().unwrap();
3006        self.ctx.field_columns = vec![left_field_col.clone()];
3007
3008        // Generate join plan.
3009        // All set operations in PromQL are "distinct"
3010        match op.id() {
3011            token::T_LAND => LogicalPlanBuilder::from(left)
3012                .distinct()
3013                .context(DataFusionPlanningSnafu)?
3014                .join_detailed(
3015                    right,
3016                    JoinType::LeftSemi,
3017                    (join_keys.clone(), join_keys),
3018                    None,
3019                    NullEquality::NullEqualsNull,
3020                )
3021                .context(DataFusionPlanningSnafu)?
3022                .build()
3023                .context(DataFusionPlanningSnafu),
3024            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3025                .distinct()
3026                .context(DataFusionPlanningSnafu)?
3027                .join_detailed(
3028                    right,
3029                    JoinType::LeftAnti,
3030                    (join_keys.clone(), join_keys),
3031                    None,
3032                    NullEquality::NullEqualsNull,
3033                )
3034                .context(DataFusionPlanningSnafu)?
3035                .build()
3036                .context(DataFusionPlanningSnafu),
3037            token::T_LOR => {
3038                // OR is handled at the beginning of this function, as it cannot
3039                // be expressed using JOIN like AND and UNLESS.
3040                unreachable!()
3041            }
3042            _ => UnexpectedTokenSnafu { token: op }.fail(),
3043        }
3044    }
3045
3046    // TODO(ruihang): change function name
3047    #[allow(clippy::too_many_arguments)]
3048    fn or_operator(
3049        &mut self,
3050        left: LogicalPlan,
3051        right: LogicalPlan,
3052        left_tag_cols_set: HashSet<String>,
3053        right_tag_cols_set: HashSet<String>,
3054        left_context: PromPlannerContext,
3055        right_context: PromPlannerContext,
3056        modifier: &Option<BinModifier>,
3057    ) -> Result<LogicalPlan> {
3058        // checks
3059        ensure!(
3060            left_context.field_columns.len() == right_context.field_columns.len(),
3061            CombineTableColumnMismatchSnafu {
3062                left: left_context.field_columns.clone(),
3063                right: right_context.field_columns.clone()
3064            }
3065        );
3066        ensure!(
3067            left_context.field_columns.len() == 1,
3068            MultiFieldsNotSupportedSnafu {
3069                operator: "OR operator"
3070            }
3071        );
3072
3073        // prepare hash sets
3074        let all_tags = left_tag_cols_set
3075            .union(&right_tag_cols_set)
3076            .cloned()
3077            .collect::<HashSet<_>>();
3078        let tags_not_in_left = all_tags
3079            .difference(&left_tag_cols_set)
3080            .cloned()
3081            .collect::<Vec<_>>();
3082        let tags_not_in_right = all_tags
3083            .difference(&right_tag_cols_set)
3084            .cloned()
3085            .collect::<Vec<_>>();
3086        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3087        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3088        let left_qualifier_string = left_qualifier
3089            .as_ref()
3090            .map(|l| l.to_string())
3091            .unwrap_or_default();
3092        let right_qualifier_string = right_qualifier
3093            .as_ref()
3094            .map(|r| r.to_string())
3095            .unwrap_or_default();
3096        let left_time_index_column =
3097            left_context
3098                .time_index_column
3099                .clone()
3100                .with_context(|| TimeIndexNotFoundSnafu {
3101                    table: left_qualifier_string.clone(),
3102                })?;
3103        let right_time_index_column =
3104            right_context
3105                .time_index_column
3106                .clone()
3107                .with_context(|| TimeIndexNotFoundSnafu {
3108                    table: right_qualifier_string.clone(),
3109                })?;
3110        // Take the name of first field column. The length is checked above.
3111        let left_field_col = left_context.field_columns.first().unwrap();
3112        let right_field_col = right_context.field_columns.first().unwrap();
3113
3114        // step 0: fill all columns in output schema
3115        let mut all_columns_set = left
3116            .schema()
3117            .fields()
3118            .iter()
3119            .chain(right.schema().fields().iter())
3120            .map(|field| field.name().clone())
3121            .collect::<HashSet<_>>();
3122        // remove time index column
3123        all_columns_set.remove(&left_time_index_column);
3124        all_columns_set.remove(&right_time_index_column);
3125        // remove field column in the right
3126        if left_field_col != right_field_col {
3127            all_columns_set.remove(right_field_col);
3128        }
3129        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3130        // sort to ensure the generated schema is not volatile
3131        all_columns.sort_unstable();
3132        // use left time index column name as the result time index column name
3133        all_columns.insert(0, left_time_index_column.clone());
3134
3135        // step 1: align schema using project, fill non-exist columns with null
3136        let left_proj_exprs = all_columns.iter().map(|col| {
3137            if tags_not_in_left.contains(col) {
3138                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3139            } else {
3140                DfExpr::Column(Column::new(None::<String>, col))
3141            }
3142        });
3143        let right_time_index_expr = DfExpr::Column(Column::new(
3144            right_qualifier.clone(),
3145            right_time_index_column,
3146        ))
3147        .alias(left_time_index_column.clone());
3148        // The field column in right side may not have qualifier (it may be removed by join operation),
3149        // so we need to find it from the schema.
3150        let right_qualifier_for_field = right
3151            .schema()
3152            .iter()
3153            .find(|(_, f)| f.name() == right_field_col)
3154            .map(|(q, _)| q)
3155            .context(ColumnNotFoundSnafu {
3156                col: right_field_col.to_string(),
3157            })?
3158            .cloned();
3159
3160        // `skip(1)` to skip the time index column
3161        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3162            // expr
3163            if col == left_field_col && left_field_col != right_field_col {
3164                // qualify field in right side if necessary to handle different field name
3165                DfExpr::Column(Column::new(
3166                    right_qualifier_for_field.clone(),
3167                    right_field_col,
3168                ))
3169            } else if tags_not_in_right.contains(col) {
3170                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3171            } else {
3172                DfExpr::Column(Column::new(None::<String>, col))
3173            }
3174        });
3175        let right_proj_exprs = [right_time_index_expr]
3176            .into_iter()
3177            .chain(right_proj_exprs_without_time_index);
3178
3179        let left_projected = LogicalPlanBuilder::from(left)
3180            .project(left_proj_exprs)
3181            .context(DataFusionPlanningSnafu)?
3182            .alias(left_qualifier_string.clone())
3183            .context(DataFusionPlanningSnafu)?
3184            .build()
3185            .context(DataFusionPlanningSnafu)?;
3186        let right_projected = LogicalPlanBuilder::from(right)
3187            .project(right_proj_exprs)
3188            .context(DataFusionPlanningSnafu)?
3189            .alias(right_qualifier_string.clone())
3190            .context(DataFusionPlanningSnafu)?
3191            .build()
3192            .context(DataFusionPlanningSnafu)?;
3193
3194        // step 2: compute match columns
3195        let mut match_columns = if let Some(modifier) = modifier
3196            && let Some(matching) = &modifier.matching
3197        {
3198            match matching {
3199                // keeps columns mentioned in `on`
3200                LabelModifier::Include(on) => on.labels.clone(),
3201                // removes columns memtioned in `ignoring`
3202                LabelModifier::Exclude(ignoring) => {
3203                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3204                    all_tags.difference(&ignoring).cloned().collect()
3205                }
3206            }
3207        } else {
3208            all_tags.iter().cloned().collect()
3209        };
3210        // sort to ensure the generated plan is not volatile
3211        match_columns.sort_unstable();
3212        // step 3: build `UnionDistinctOn` plan
3213        let schema = left_projected.schema().clone();
3214        let union_distinct_on = UnionDistinctOn::new(
3215            left_projected,
3216            right_projected,
3217            match_columns,
3218            left_time_index_column.clone(),
3219            schema,
3220        );
3221        let result = LogicalPlan::Extension(Extension {
3222            node: Arc::new(union_distinct_on),
3223        });
3224
3225        // step 4: update context
3226        self.ctx.time_index_column = Some(left_time_index_column);
3227        self.ctx.tag_columns = all_tags.into_iter().collect();
3228        self.ctx.field_columns = vec![left_field_col.to_string()];
3229
3230        Ok(result)
3231    }
3232
3233    /// Build a projection that project and perform operation expr for every value columns.
3234    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3235    ///
3236    /// # Side effect
3237    ///
3238    /// This function will update the value columns in the context. Those new column names
3239    /// don't contains qualifier.
3240    fn projection_for_each_field_column<F>(
3241        &mut self,
3242        input: LogicalPlan,
3243        name_to_expr: F,
3244    ) -> Result<LogicalPlan>
3245    where
3246        F: FnMut(&String) -> Result<DfExpr>,
3247    {
3248        let non_field_columns_iter = self
3249            .ctx
3250            .tag_columns
3251            .iter()
3252            .chain(self.ctx.time_index_column.iter())
3253            .map(|col| {
3254                Ok(DfExpr::Column(Column::new(
3255                    self.ctx.table_name.clone().map(TableReference::bare),
3256                    col,
3257                )))
3258            });
3259
3260        // build computation exprs
3261        let result_field_columns = self
3262            .ctx
3263            .field_columns
3264            .iter()
3265            .map(name_to_expr)
3266            .collect::<Result<Vec<_>>>()?;
3267
3268        // alias the computation exprs to remove qualifier
3269        self.ctx.field_columns = result_field_columns
3270            .iter()
3271            .map(|expr| expr.schema_name().to_string())
3272            .collect();
3273        let field_columns_iter = result_field_columns
3274            .into_iter()
3275            .zip(self.ctx.field_columns.iter())
3276            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3277
3278        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3279        let project_fields = non_field_columns_iter
3280            .chain(field_columns_iter)
3281            .collect::<Result<Vec<_>>>()?;
3282
3283        LogicalPlanBuilder::from(input)
3284            .project(project_fields)
3285            .context(DataFusionPlanningSnafu)?
3286            .build()
3287            .context(DataFusionPlanningSnafu)
3288    }
3289
3290    /// Build a filter plan that filter on value column. Notice that only one value column
3291    /// is expected.
3292    fn filter_on_field_column<F>(
3293        &self,
3294        input: LogicalPlan,
3295        mut name_to_expr: F,
3296    ) -> Result<LogicalPlan>
3297    where
3298        F: FnMut(&String) -> Result<DfExpr>,
3299    {
3300        ensure!(
3301            self.ctx.field_columns.len() == 1,
3302            UnsupportedExprSnafu {
3303                name: "filter on multi-value input"
3304            }
3305        );
3306
3307        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3308
3309        LogicalPlanBuilder::from(input)
3310            .filter(field_column_filter)
3311            .context(DataFusionPlanningSnafu)?
3312            .build()
3313            .context(DataFusionPlanningSnafu)
3314    }
3315
3316    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3317    /// time index column in context is set
3318    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3319        let input_expr = datafusion::logical_expr::col(
3320            self.ctx
3321                .time_index_column
3322                .as_ref()
3323                // table name doesn't matters here
3324                .with_context(|| TimeIndexNotFoundSnafu {
3325                    table: "<doesn't matter>",
3326                })?,
3327        );
3328        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3329            func: datafusion_functions::datetime::date_part(),
3330            args: vec![date_part.lit(), input_expr],
3331        });
3332        Ok(fn_expr)
3333    }
3334}
3335
3336#[derive(Default, Debug)]
3337struct FunctionArgs {
3338    input: Option<PromExpr>,
3339    literals: Vec<DfExpr>,
3340}
3341
3342/// Represents different types of scalar functions supported in PromQL expressions.
3343/// Each variant defines how the function should be processed and what arguments it expects.
3344#[derive(Debug, Clone)]
3345enum ScalarFunc {
3346    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3347    /// These are passed through directly to DataFusion's execution engine.
3348    /// Processing: Simple argument insertion at the specified position.
3349    DataFusionBuiltin(Arc<ScalarUdfDef>),
3350    /// User-defined functions registered in DataFusion's function registry.
3351    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3352    /// Processing: Direct pass-through with argument positioning.
3353    DataFusionUdf(Arc<ScalarUdfDef>),
3354    /// PromQL-specific functions that operate on time series data with temporal context.
3355    /// These functions require both timestamp ranges and values to perform calculations.
3356    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3357    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3358    Udf(Arc<ScalarUdfDef>),
3359    /// PromQL functions requiring extrapolation calculations with explicit range information.
3360    /// These functions need to know the time range length to perform rate calculations.
3361    /// The second field contains the range length in milliseconds.
3362    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
3363    /// Examples: increase, rate, delta
3364    // TODO(ruihang): maybe merge with Udf later
3365    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3366    /// Functions that generate expressions directly without external UDF calls.
3367    /// The expression is constructed during function matching and requires no additional processing.
3368    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
3369    GeneratedExpr,
3370}
3371
3372#[cfg(test)]
3373mod test {
3374    use std::time::{Duration, UNIX_EPOCH};
3375
3376    use catalog::memory::{new_memory_catalog_manager, MemoryCatalogManager};
3377    use catalog::RegisterTableRequest;
3378    use common_base::Plugins;
3379    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3380    use common_query::test_util::DummyDecoder;
3381    use datatypes::prelude::ConcreteDataType;
3382    use datatypes::schema::{ColumnSchema, Schema};
3383    use promql_parser::label::Labels;
3384    use promql_parser::parser;
3385    use session::context::QueryContext;
3386    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3387    use table::test_util::EmptyTable;
3388
3389    use super::*;
3390    use crate::options::QueryOptions;
3391
3392    fn build_query_engine_state() -> QueryEngineState {
3393        QueryEngineState::new(
3394            new_memory_catalog_manager().unwrap(),
3395            None,
3396            None,
3397            None,
3398            None,
3399            None,
3400            false,
3401            Plugins::default(),
3402            QueryOptions::default(),
3403        )
3404    }
3405
3406    async fn build_test_table_provider(
3407        table_name_tuples: &[(String, String)],
3408        num_tag: usize,
3409        num_field: usize,
3410    ) -> DfTableSourceProvider {
3411        let catalog_list = MemoryCatalogManager::with_default_setup();
3412        for (schema_name, table_name) in table_name_tuples {
3413            let mut columns = vec![];
3414            for i in 0..num_tag {
3415                columns.push(ColumnSchema::new(
3416                    format!("tag_{i}"),
3417                    ConcreteDataType::string_datatype(),
3418                    false,
3419                ));
3420            }
3421            columns.push(
3422                ColumnSchema::new(
3423                    "timestamp".to_string(),
3424                    ConcreteDataType::timestamp_millisecond_datatype(),
3425                    false,
3426                )
3427                .with_time_index(true),
3428            );
3429            for i in 0..num_field {
3430                columns.push(ColumnSchema::new(
3431                    format!("field_{i}"),
3432                    ConcreteDataType::float64_datatype(),
3433                    true,
3434                ));
3435            }
3436            let schema = Arc::new(Schema::new(columns));
3437            let table_meta = TableMetaBuilder::empty()
3438                .schema(schema)
3439                .primary_key_indices((0..num_tag).collect())
3440                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3441                .next_column_id(1024)
3442                .build()
3443                .unwrap();
3444            let table_info = TableInfoBuilder::default()
3445                .name(table_name.to_string())
3446                .meta(table_meta)
3447                .build()
3448                .unwrap();
3449            let table = EmptyTable::from_table_info(&table_info);
3450
3451            assert!(catalog_list
3452                .register_table_sync(RegisterTableRequest {
3453                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3454                    schema: schema_name.to_string(),
3455                    table_name: table_name.to_string(),
3456                    table_id: 1024,
3457                    table,
3458                })
3459                .is_ok());
3460        }
3461
3462        DfTableSourceProvider::new(
3463            catalog_list,
3464            false,
3465            QueryContext::arc(),
3466            DummyDecoder::arc(),
3467            false,
3468        )
3469    }
3470
3471    async fn build_test_table_provider_with_fields(
3472        table_name_tuples: &[(String, String)],
3473        tags: &[&str],
3474    ) -> DfTableSourceProvider {
3475        let catalog_list = MemoryCatalogManager::with_default_setup();
3476        for (schema_name, table_name) in table_name_tuples {
3477            let mut columns = vec![];
3478            let num_tag = tags.len();
3479            for tag in tags {
3480                columns.push(ColumnSchema::new(
3481                    tag.to_string(),
3482                    ConcreteDataType::string_datatype(),
3483                    false,
3484                ));
3485            }
3486            columns.push(
3487                ColumnSchema::new(
3488                    "greptime_timestamp".to_string(),
3489                    ConcreteDataType::timestamp_millisecond_datatype(),
3490                    false,
3491                )
3492                .with_time_index(true),
3493            );
3494            columns.push(ColumnSchema::new(
3495                "greptime_value".to_string(),
3496                ConcreteDataType::float64_datatype(),
3497                true,
3498            ));
3499            let schema = Arc::new(Schema::new(columns));
3500            let table_meta = TableMetaBuilder::empty()
3501                .schema(schema)
3502                .primary_key_indices((0..num_tag).collect())
3503                .next_column_id(1024)
3504                .build()
3505                .unwrap();
3506            let table_info = TableInfoBuilder::default()
3507                .name(table_name.to_string())
3508                .meta(table_meta)
3509                .build()
3510                .unwrap();
3511            let table = EmptyTable::from_table_info(&table_info);
3512
3513            assert!(catalog_list
3514                .register_table_sync(RegisterTableRequest {
3515                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3516                    schema: schema_name.to_string(),
3517                    table_name: table_name.to_string(),
3518                    table_id: 1024,
3519                    table,
3520                })
3521                .is_ok());
3522        }
3523
3524        DfTableSourceProvider::new(
3525            catalog_list,
3526            false,
3527            QueryContext::arc(),
3528            DummyDecoder::arc(),
3529            false,
3530        )
3531    }
3532
3533    // {
3534    //     input: `abs(some_metric{foo!="bar"})`,
3535    //     expected: &Call{
3536    //         Func: MustGetFunction("abs"),
3537    //         Args: Expressions{
3538    //             &VectorSelector{
3539    //                 Name: "some_metric",
3540    //                 LabelMatchers: []*labels.Matcher{
3541    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3542    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3543    //                 },
3544    //             },
3545    //         },
3546    //     },
3547    // },
3548    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3549        let prom_expr =
3550            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3551        let eval_stmt = EvalStmt {
3552            expr: prom_expr,
3553            start: UNIX_EPOCH,
3554            end: UNIX_EPOCH
3555                .checked_add(Duration::from_secs(100_000))
3556                .unwrap(),
3557            interval: Duration::from_secs(5),
3558            lookback_delta: Duration::from_secs(1),
3559        };
3560
3561        let table_provider = build_test_table_provider(
3562            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3563            1,
3564            1,
3565        )
3566        .await;
3567        let plan =
3568            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3569                .await
3570                .unwrap();
3571
3572        let expected = String::from(
3573            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3574            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3575            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3576            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3577            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3578            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3579            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3580        ).replace("TEMPLATE", plan_name);
3581
3582        assert_eq!(plan.display_indent_schema().to_string(), expected);
3583    }
3584
3585    #[tokio::test]
3586    async fn single_abs() {
3587        do_single_instant_function_call("abs", "abs").await;
3588    }
3589
3590    #[tokio::test]
3591    #[should_panic]
3592    async fn single_absent() {
3593        do_single_instant_function_call("absent", "").await;
3594    }
3595
3596    #[tokio::test]
3597    async fn single_ceil() {
3598        do_single_instant_function_call("ceil", "ceil").await;
3599    }
3600
3601    #[tokio::test]
3602    async fn single_exp() {
3603        do_single_instant_function_call("exp", "exp").await;
3604    }
3605
3606    #[tokio::test]
3607    async fn single_ln() {
3608        do_single_instant_function_call("ln", "ln").await;
3609    }
3610
3611    #[tokio::test]
3612    async fn single_log2() {
3613        do_single_instant_function_call("log2", "log2").await;
3614    }
3615
3616    #[tokio::test]
3617    async fn single_log10() {
3618        do_single_instant_function_call("log10", "log10").await;
3619    }
3620
3621    #[tokio::test]
3622    #[should_panic]
3623    async fn single_scalar() {
3624        do_single_instant_function_call("scalar", "").await;
3625    }
3626
3627    #[tokio::test]
3628    #[should_panic]
3629    async fn single_sgn() {
3630        do_single_instant_function_call("sgn", "").await;
3631    }
3632
3633    #[tokio::test]
3634    #[should_panic]
3635    async fn single_sort() {
3636        do_single_instant_function_call("sort", "").await;
3637    }
3638
3639    #[tokio::test]
3640    #[should_panic]
3641    async fn single_sort_desc() {
3642        do_single_instant_function_call("sort_desc", "").await;
3643    }
3644
3645    #[tokio::test]
3646    async fn single_sqrt() {
3647        do_single_instant_function_call("sqrt", "sqrt").await;
3648    }
3649
3650    #[tokio::test]
3651    #[should_panic]
3652    async fn single_timestamp() {
3653        do_single_instant_function_call("timestamp", "").await;
3654    }
3655
3656    #[tokio::test]
3657    async fn single_acos() {
3658        do_single_instant_function_call("acos", "acos").await;
3659    }
3660
3661    #[tokio::test]
3662    #[should_panic]
3663    async fn single_acosh() {
3664        do_single_instant_function_call("acosh", "").await;
3665    }
3666
3667    #[tokio::test]
3668    async fn single_asin() {
3669        do_single_instant_function_call("asin", "asin").await;
3670    }
3671
3672    #[tokio::test]
3673    #[should_panic]
3674    async fn single_asinh() {
3675        do_single_instant_function_call("asinh", "").await;
3676    }
3677
3678    #[tokio::test]
3679    async fn single_atan() {
3680        do_single_instant_function_call("atan", "atan").await;
3681    }
3682
3683    #[tokio::test]
3684    #[should_panic]
3685    async fn single_atanh() {
3686        do_single_instant_function_call("atanh", "").await;
3687    }
3688
3689    #[tokio::test]
3690    async fn single_cos() {
3691        do_single_instant_function_call("cos", "cos").await;
3692    }
3693
3694    #[tokio::test]
3695    #[should_panic]
3696    async fn single_cosh() {
3697        do_single_instant_function_call("cosh", "").await;
3698    }
3699
3700    #[tokio::test]
3701    async fn single_sin() {
3702        do_single_instant_function_call("sin", "sin").await;
3703    }
3704
3705    #[tokio::test]
3706    #[should_panic]
3707    async fn single_sinh() {
3708        do_single_instant_function_call("sinh", "").await;
3709    }
3710
3711    #[tokio::test]
3712    async fn single_tan() {
3713        do_single_instant_function_call("tan", "tan").await;
3714    }
3715
3716    #[tokio::test]
3717    #[should_panic]
3718    async fn single_tanh() {
3719        do_single_instant_function_call("tanh", "").await;
3720    }
3721
3722    #[tokio::test]
3723    #[should_panic]
3724    async fn single_deg() {
3725        do_single_instant_function_call("deg", "").await;
3726    }
3727
3728    #[tokio::test]
3729    #[should_panic]
3730    async fn single_rad() {
3731        do_single_instant_function_call("rad", "").await;
3732    }
3733
3734    // {
3735    //     input: "avg by (foo)(some_metric)",
3736    //     expected: &AggregateExpr{
3737    //         Op: AVG,
3738    //         Expr: &VectorSelector{
3739    //             Name: "some_metric",
3740    //             LabelMatchers: []*labels.Matcher{
3741    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3742    //             },
3743    //             PosRange: PositionRange{
3744    //                 Start: 13,
3745    //                 End:   24,
3746    //             },
3747    //         },
3748    //         Grouping: []string{"foo"},
3749    //         PosRange: PositionRange{
3750    //             Start: 0,
3751    //             End:   25,
3752    //         },
3753    //     },
3754    // },
3755    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3756        let prom_expr = parser::parse(&format!(
3757            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3758        ))
3759        .unwrap();
3760        let mut eval_stmt = EvalStmt {
3761            expr: prom_expr,
3762            start: UNIX_EPOCH,
3763            end: UNIX_EPOCH
3764                .checked_add(Duration::from_secs(100_000))
3765                .unwrap(),
3766            interval: Duration::from_secs(5),
3767            lookback_delta: Duration::from_secs(1),
3768        };
3769
3770        // test group by
3771        let table_provider = build_test_table_provider(
3772            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3773            2,
3774            2,
3775        )
3776        .await;
3777        let plan =
3778            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3779                .await
3780                .unwrap();
3781        let expected_no_without = String::from(
3782            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3783            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3784            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3785            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3786            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3787            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3788            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3789        ).replace("TEMPLATE", plan_name);
3790        assert_eq!(
3791            plan.display_indent_schema().to_string(),
3792            expected_no_without
3793        );
3794
3795        // test group without
3796        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3797            *modifier = Some(LabelModifier::Exclude(Labels {
3798                labels: vec![String::from("tag_1")].into_iter().collect(),
3799            }));
3800        }
3801        let table_provider = build_test_table_provider(
3802            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3803            2,
3804            2,
3805        )
3806        .await;
3807        let plan =
3808            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3809                .await
3810                .unwrap();
3811        let expected_without = String::from(
3812            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3813            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3814            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3815            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3816            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3817            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3818            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3819        ).replace("TEMPLATE", plan_name);
3820        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3821    }
3822
3823    #[tokio::test]
3824    async fn aggregate_sum() {
3825        do_aggregate_expr_plan("sum", "sum").await;
3826    }
3827
3828    #[tokio::test]
3829    async fn aggregate_avg() {
3830        do_aggregate_expr_plan("avg", "avg").await;
3831    }
3832
3833    #[tokio::test]
3834    #[should_panic] // output type doesn't match
3835    async fn aggregate_count() {
3836        do_aggregate_expr_plan("count", "count").await;
3837    }
3838
3839    #[tokio::test]
3840    async fn aggregate_min() {
3841        do_aggregate_expr_plan("min", "min").await;
3842    }
3843
3844    #[tokio::test]
3845    async fn aggregate_max() {
3846        do_aggregate_expr_plan("max", "max").await;
3847    }
3848
3849    #[tokio::test]
3850    #[should_panic] // output type doesn't match
3851    async fn aggregate_group() {
3852        do_aggregate_expr_plan("grouping", "GROUPING").await;
3853    }
3854
3855    #[tokio::test]
3856    async fn aggregate_stddev() {
3857        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3858    }
3859
3860    #[tokio::test]
3861    async fn aggregate_stdvar() {
3862        do_aggregate_expr_plan("stdvar", "var_pop").await;
3863    }
3864
3865    // TODO(ruihang): add range fn tests once exprs are ready.
3866
3867    // {
3868    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3869    //     expected: &BinaryExpr{
3870    //         Op: ADD,
3871    //         LHS: &VectorSelector{
3872    //             Name: "a",
3873    //             LabelMatchers: []*labels.Matcher{
3874    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3875    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3876    //             },
3877    //         },
3878    //         RHS: &VectorSelector{
3879    //             Name: "sum",
3880    //             LabelMatchers: []*labels.Matcher{
3881    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3882    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3883    //             },
3884    //         },
3885    //         VectorMatching: &VectorMatching{},
3886    //     },
3887    // },
3888    #[tokio::test]
3889    async fn binary_op_column_column() {
3890        let prom_expr =
3891            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3892        let eval_stmt = EvalStmt {
3893            expr: prom_expr,
3894            start: UNIX_EPOCH,
3895            end: UNIX_EPOCH
3896                .checked_add(Duration::from_secs(100_000))
3897                .unwrap(),
3898            interval: Duration::from_secs(5),
3899            lookback_delta: Duration::from_secs(1),
3900        };
3901
3902        let table_provider = build_test_table_provider(
3903            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3904            1,
3905            1,
3906        )
3907        .await;
3908        let plan =
3909            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3910                .await
3911                .unwrap();
3912
3913        let  expected = String::from(
3914            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3915            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3916            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3917            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3918            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3919            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3920            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3921            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3922            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3923            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3924            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3925            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3926            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3927            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3928        );
3929
3930        assert_eq!(plan.display_indent_schema().to_string(), expected);
3931    }
3932
3933    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3934        let prom_expr = parser::parse(query).unwrap();
3935        let eval_stmt = EvalStmt {
3936            expr: prom_expr,
3937            start: UNIX_EPOCH,
3938            end: UNIX_EPOCH
3939                .checked_add(Duration::from_secs(100_000))
3940                .unwrap(),
3941            interval: Duration::from_secs(5),
3942            lookback_delta: Duration::from_secs(1),
3943        };
3944
3945        let table_provider = build_test_table_provider(
3946            &[
3947                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3948                (
3949                    "greptime_private".to_string(),
3950                    "some_alt_metric".to_string(),
3951                ),
3952            ],
3953            1,
3954            1,
3955        )
3956        .await;
3957        let plan =
3958            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3959                .await
3960                .unwrap();
3961
3962        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
3963    }
3964
3965    #[tokio::test]
3966    async fn binary_op_literal_column() {
3967        let query = r#"1 + some_metric{tag_0="bar"}"#;
3968        let expected = String::from(
3969            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3970            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3971            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3972            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3973            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3974            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3975        );
3976
3977        indie_query_plan_compare(query, expected).await;
3978    }
3979
3980    #[tokio::test]
3981    async fn binary_op_literal_literal() {
3982        let query = r#"1 + 1"#;
3983        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
3984  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
3985        indie_query_plan_compare(query, expected).await;
3986    }
3987
3988    #[tokio::test]
3989    async fn simple_bool_grammar() {
3990        let query = "some_metric != bool 1.2345";
3991        let expected = String::from(
3992            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3993            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3994            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3995            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3996            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3997            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3998        );
3999
4000        indie_query_plan_compare(query, expected).await;
4001    }
4002
4003    #[tokio::test]
4004    async fn bool_with_additional_arithmetic() {
4005        let query = "some_metric + (1 == bool 2)";
4006        let expected = String::from(
4007            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4008            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4009            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4010            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4011            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4012            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4013        );
4014
4015        indie_query_plan_compare(query, expected).await;
4016    }
4017
4018    #[tokio::test]
4019    async fn simple_unary() {
4020        let query = "-some_metric";
4021        let expected = String::from(
4022            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4023            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4024            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4025            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4026            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4027            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4028        );
4029
4030        indie_query_plan_compare(query, expected).await;
4031    }
4032
4033    #[tokio::test]
4034    async fn increase_aggr() {
4035        let query = "increase(some_metric[5m])";
4036        let expected = String::from(
4037            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4038            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4039            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4040            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4041            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4042            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4043            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4044            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4045        );
4046
4047        indie_query_plan_compare(query, expected).await;
4048    }
4049
4050    #[tokio::test]
4051    async fn less_filter_on_value() {
4052        let query = "some_metric < 1.2345";
4053        let expected = String::from(
4054            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4059            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4060        );
4061
4062        indie_query_plan_compare(query, expected).await;
4063    }
4064
4065    #[tokio::test]
4066    async fn count_over_time() {
4067        let query = "count_over_time(some_metric[5m])";
4068        let expected = String::from(
4069            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4070            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4071            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4072            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4073            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4074            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4075            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4076            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4077        );
4078
4079        indie_query_plan_compare(query, expected).await;
4080    }
4081
4082    #[tokio::test]
4083    async fn test_hash_join() {
4084        let mut eval_stmt = EvalStmt {
4085            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4086            start: UNIX_EPOCH,
4087            end: UNIX_EPOCH
4088                .checked_add(Duration::from_secs(100_000))
4089                .unwrap(),
4090            interval: Duration::from_secs(5),
4091            lookback_delta: Duration::from_secs(1),
4092        };
4093
4094        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4095
4096        let prom_expr = parser::parse(case).unwrap();
4097        eval_stmt.expr = prom_expr;
4098        let table_provider = build_test_table_provider_with_fields(
4099            &[
4100                (
4101                    DEFAULT_SCHEMA_NAME.to_string(),
4102                    "http_server_requests_seconds_sum".to_string(),
4103                ),
4104                (
4105                    DEFAULT_SCHEMA_NAME.to_string(),
4106                    "http_server_requests_seconds_count".to_string(),
4107                ),
4108            ],
4109            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4110        )
4111        .await;
4112        // Should be ok
4113        let plan =
4114            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4115                .await
4116                .unwrap();
4117        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4118            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4119            \n    SubqueryAlias: http_server_requests_seconds_sum\
4120            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4121            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4122            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4123            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4124            \n              TableScan: http_server_requests_seconds_sum\
4125            \n    SubqueryAlias: http_server_requests_seconds_count\
4126            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4127            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4128            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4129            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4130            \n              TableScan: http_server_requests_seconds_count";
4131        assert_eq!(plan.to_string(), expected);
4132    }
4133
4134    #[tokio::test]
4135    async fn test_nested_histogram_quantile() {
4136        let mut eval_stmt = EvalStmt {
4137            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4138            start: UNIX_EPOCH,
4139            end: UNIX_EPOCH
4140                .checked_add(Duration::from_secs(100_000))
4141                .unwrap(),
4142            interval: Duration::from_secs(5),
4143            lookback_delta: Duration::from_secs(1),
4144        };
4145
4146        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4147
4148        let prom_expr = parser::parse(case).unwrap();
4149        eval_stmt.expr = prom_expr;
4150        let table_provider = build_test_table_provider_with_fields(
4151            &[(
4152                DEFAULT_SCHEMA_NAME.to_string(),
4153                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4154            )],
4155            &["pod", "le", "path", "code", "container"],
4156        )
4157        .await;
4158        // Should be ok
4159        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4160            .await
4161            .unwrap();
4162    }
4163
4164    #[tokio::test]
4165    async fn test_parse_and_operator() {
4166        let mut eval_stmt = EvalStmt {
4167            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4168            start: UNIX_EPOCH,
4169            end: UNIX_EPOCH
4170                .checked_add(Duration::from_secs(100_000))
4171                .unwrap(),
4172            interval: Duration::from_secs(5),
4173            lookback_delta: Duration::from_secs(1),
4174        };
4175
4176        let cases = [
4177            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4178            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4179        ];
4180
4181        for case in cases {
4182            let prom_expr = parser::parse(case).unwrap();
4183            eval_stmt.expr = prom_expr;
4184            let table_provider = build_test_table_provider_with_fields(
4185                &[
4186                    (
4187                        DEFAULT_SCHEMA_NAME.to_string(),
4188                        "kubelet_volume_stats_used_bytes".to_string(),
4189                    ),
4190                    (
4191                        DEFAULT_SCHEMA_NAME.to_string(),
4192                        "kubelet_volume_stats_capacity_bytes".to_string(),
4193                    ),
4194                ],
4195                &["namespace", "persistentvolumeclaim"],
4196            )
4197            .await;
4198            // Should be ok
4199            let _ =
4200                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4201                    .await
4202                    .unwrap();
4203        }
4204    }
4205
4206    #[tokio::test]
4207    async fn test_nested_binary_op() {
4208        let mut eval_stmt = EvalStmt {
4209            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4210            start: UNIX_EPOCH,
4211            end: UNIX_EPOCH
4212                .checked_add(Duration::from_secs(100_000))
4213                .unwrap(),
4214            interval: Duration::from_secs(5),
4215            lookback_delta: Duration::from_secs(1),
4216        };
4217
4218        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4219        (
4220            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4221            or
4222            vector(0)
4223        )"#;
4224
4225        let prom_expr = parser::parse(case).unwrap();
4226        eval_stmt.expr = prom_expr;
4227        let table_provider = build_test_table_provider_with_fields(
4228            &[(
4229                DEFAULT_SCHEMA_NAME.to_string(),
4230                "nginx_ingress_controller_requests".to_string(),
4231            )],
4232            &["namespace", "job"],
4233        )
4234        .await;
4235        // Should be ok
4236        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4237            .await
4238            .unwrap();
4239    }
4240
4241    #[tokio::test]
4242    async fn test_parse_or_operator() {
4243        let mut eval_stmt = EvalStmt {
4244            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4245            start: UNIX_EPOCH,
4246            end: UNIX_EPOCH
4247                .checked_add(Duration::from_secs(100_000))
4248                .unwrap(),
4249            interval: Duration::from_secs(5),
4250            lookback_delta: Duration::from_secs(1),
4251        };
4252
4253        let case = r#"
4254        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4255        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4256            or
4257        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4258        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4259
4260        let table_provider = build_test_table_provider_with_fields(
4261            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4262            &["tenant_name", "cluster_name"],
4263        )
4264        .await;
4265        eval_stmt.expr = parser::parse(case).unwrap();
4266        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4267            .await
4268            .unwrap();
4269
4270        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4271            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4272            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4273            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4274            or
4275            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4276            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4277            or
4278            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4279            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4280        let table_provider = build_test_table_provider_with_fields(
4281            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4282            &["tenant_name", "cluster_name"],
4283        )
4284        .await;
4285        eval_stmt.expr = parser::parse(case).unwrap();
4286        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4287            .await
4288            .unwrap();
4289
4290        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4291            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4292            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4293            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4294        let table_provider = build_test_table_provider_with_fields(
4295            &[
4296                (
4297                    DEFAULT_SCHEMA_NAME.to_string(),
4298                    "background_waitevent_cnt".to_string(),
4299                ),
4300                (
4301                    DEFAULT_SCHEMA_NAME.to_string(),
4302                    "foreground_waitevent_cnt".to_string(),
4303                ),
4304            ],
4305            &["tenant_name", "cluster_name"],
4306        )
4307        .await;
4308        eval_stmt.expr = parser::parse(case).unwrap();
4309        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4310            .await
4311            .unwrap();
4312
4313        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4314        let table_provider = build_test_table_provider_with_fields(
4315            &[
4316                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4317                (
4318                    DEFAULT_SCHEMA_NAME.to_string(),
4319                    "container_cpu_load_average_10s".to_string(),
4320                ),
4321                (
4322                    DEFAULT_SCHEMA_NAME.to_string(),
4323                    "container_spec_cpu_quota".to_string(),
4324                ),
4325            ],
4326            &["cluster_name", "host_name"],
4327        )
4328        .await;
4329        eval_stmt.expr = parser::parse(case).unwrap();
4330        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4331            .await
4332            .unwrap();
4333    }
4334
4335    #[tokio::test]
4336    async fn value_matcher() {
4337        // template
4338        let mut eval_stmt = EvalStmt {
4339            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4340            start: UNIX_EPOCH,
4341            end: UNIX_EPOCH
4342                .checked_add(Duration::from_secs(100_000))
4343                .unwrap(),
4344            interval: Duration::from_secs(5),
4345            lookback_delta: Duration::from_secs(1),
4346        };
4347
4348        let cases = [
4349            // single equal matcher
4350            (
4351                r#"some_metric{__field__="field_1"}"#,
4352                vec![
4353                    "some_metric.field_1",
4354                    "some_metric.tag_0",
4355                    "some_metric.tag_1",
4356                    "some_metric.tag_2",
4357                    "some_metric.timestamp",
4358                ],
4359            ),
4360            // two equal matchers
4361            (
4362                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4363                vec![
4364                    "some_metric.field_0",
4365                    "some_metric.field_1",
4366                    "some_metric.tag_0",
4367                    "some_metric.tag_1",
4368                    "some_metric.tag_2",
4369                    "some_metric.timestamp",
4370                ],
4371            ),
4372            // single not_eq matcher
4373            (
4374                r#"some_metric{__field__!="field_1"}"#,
4375                vec![
4376                    "some_metric.field_0",
4377                    "some_metric.field_2",
4378                    "some_metric.tag_0",
4379                    "some_metric.tag_1",
4380                    "some_metric.tag_2",
4381                    "some_metric.timestamp",
4382                ],
4383            ),
4384            // two not_eq matchers
4385            (
4386                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4387                vec![
4388                    "some_metric.field_0",
4389                    "some_metric.tag_0",
4390                    "some_metric.tag_1",
4391                    "some_metric.tag_2",
4392                    "some_metric.timestamp",
4393                ],
4394            ),
4395            // equal and not_eq matchers (no conflict)
4396            (
4397                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4398                vec![
4399                    "some_metric.field_1",
4400                    "some_metric.tag_0",
4401                    "some_metric.tag_1",
4402                    "some_metric.tag_2",
4403                    "some_metric.timestamp",
4404                ],
4405            ),
4406            // equal and not_eq matchers (conflict)
4407            (
4408                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4409                vec![
4410                    "some_metric.tag_0",
4411                    "some_metric.tag_1",
4412                    "some_metric.tag_2",
4413                    "some_metric.timestamp",
4414                ],
4415            ),
4416            // single regex eq matcher
4417            (
4418                r#"some_metric{__field__=~"field_1|field_2"}"#,
4419                vec![
4420                    "some_metric.field_1",
4421                    "some_metric.field_2",
4422                    "some_metric.tag_0",
4423                    "some_metric.tag_1",
4424                    "some_metric.tag_2",
4425                    "some_metric.timestamp",
4426                ],
4427            ),
4428            // single regex not_eq matcher
4429            (
4430                r#"some_metric{__field__!~"field_1|field_2"}"#,
4431                vec![
4432                    "some_metric.field_0",
4433                    "some_metric.tag_0",
4434                    "some_metric.tag_1",
4435                    "some_metric.tag_2",
4436                    "some_metric.timestamp",
4437                ],
4438            ),
4439        ];
4440
4441        for case in cases {
4442            let prom_expr = parser::parse(case.0).unwrap();
4443            eval_stmt.expr = prom_expr;
4444            let table_provider = build_test_table_provider(
4445                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4446                3,
4447                3,
4448            )
4449            .await;
4450            let plan =
4451                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4452                    .await
4453                    .unwrap();
4454            let mut fields = plan.schema().field_names();
4455            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4456            fields.sort();
4457            expected.sort();
4458            assert_eq!(fields, expected, "case: {:?}", case.0);
4459        }
4460
4461        let bad_cases = [
4462            r#"some_metric{__field__="nonexistent"}"#,
4463            r#"some_metric{__field__!="nonexistent"}"#,
4464        ];
4465
4466        for case in bad_cases {
4467            let prom_expr = parser::parse(case).unwrap();
4468            eval_stmt.expr = prom_expr;
4469            let table_provider = build_test_table_provider(
4470                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4471                3,
4472                3,
4473            )
4474            .await;
4475            let plan =
4476                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4477                    .await;
4478            assert!(plan.is_err(), "case: {:?}", case);
4479        }
4480    }
4481
4482    #[tokio::test]
4483    async fn custom_schema() {
4484        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4485        let expected = String::from(
4486            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4487            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4488            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4489            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4490            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4491        );
4492
4493        indie_query_plan_compare(query, expected).await;
4494
4495        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4496        let expected = String::from(
4497            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4498            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4499            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4500            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4501            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4502        );
4503
4504        indie_query_plan_compare(query, expected).await;
4505
4506        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4507        let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4508        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4509        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4510        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4511        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4512        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4513        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4514        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4515        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4516        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4517        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4518        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4519        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4520        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4521
4522        indie_query_plan_compare(query, expected).await;
4523    }
4524
4525    #[tokio::test]
4526    async fn only_equals_is_supported_for_special_matcher() {
4527        let queries = &[
4528            "some_alt_metric{__schema__!=\"greptime_private\"}",
4529            "some_alt_metric{__schema__=~\"lalala\"}",
4530            "some_alt_metric{__database__!=\"greptime_private\"}",
4531            "some_alt_metric{__database__=~\"lalala\"}",
4532        ];
4533
4534        for query in queries {
4535            let prom_expr = parser::parse(query).unwrap();
4536            let eval_stmt = EvalStmt {
4537                expr: prom_expr,
4538                start: UNIX_EPOCH,
4539                end: UNIX_EPOCH
4540                    .checked_add(Duration::from_secs(100_000))
4541                    .unwrap(),
4542                interval: Duration::from_secs(5),
4543                lookback_delta: Duration::from_secs(1),
4544            };
4545
4546            let table_provider = build_test_table_provider(
4547                &[
4548                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4549                    (
4550                        "greptime_private".to_string(),
4551                        "some_alt_metric".to_string(),
4552                    ),
4553                ],
4554                1,
4555                1,
4556            )
4557            .await;
4558
4559            let plan =
4560                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4561                    .await;
4562            assert!(plan.is_err(), "query: {:?}", query);
4563        }
4564    }
4565
4566    #[tokio::test]
4567    async fn test_non_ms_precision() {
4568        let catalog_list = MemoryCatalogManager::with_default_setup();
4569        let columns = vec![
4570            ColumnSchema::new(
4571                "tag".to_string(),
4572                ConcreteDataType::string_datatype(),
4573                false,
4574            ),
4575            ColumnSchema::new(
4576                "timestamp".to_string(),
4577                ConcreteDataType::timestamp_nanosecond_datatype(),
4578                false,
4579            )
4580            .with_time_index(true),
4581            ColumnSchema::new(
4582                "field".to_string(),
4583                ConcreteDataType::float64_datatype(),
4584                true,
4585            ),
4586        ];
4587        let schema = Arc::new(Schema::new(columns));
4588        let table_meta = TableMetaBuilder::empty()
4589            .schema(schema)
4590            .primary_key_indices(vec![0])
4591            .value_indices(vec![2])
4592            .next_column_id(1024)
4593            .build()
4594            .unwrap();
4595        let table_info = TableInfoBuilder::default()
4596            .name("metrics".to_string())
4597            .meta(table_meta)
4598            .build()
4599            .unwrap();
4600        let table = EmptyTable::from_table_info(&table_info);
4601        assert!(catalog_list
4602            .register_table_sync(RegisterTableRequest {
4603                catalog: DEFAULT_CATALOG_NAME.to_string(),
4604                schema: DEFAULT_SCHEMA_NAME.to_string(),
4605                table_name: "metrics".to_string(),
4606                table_id: 1024,
4607                table,
4608            })
4609            .is_ok());
4610
4611        let plan = PromPlanner::stmt_to_plan(
4612            DfTableSourceProvider::new(
4613                catalog_list.clone(),
4614                false,
4615                QueryContext::arc(),
4616                DummyDecoder::arc(),
4617                true,
4618            ),
4619            &EvalStmt {
4620                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4621                start: UNIX_EPOCH,
4622                end: UNIX_EPOCH
4623                    .checked_add(Duration::from_secs(100_000))
4624                    .unwrap(),
4625                interval: Duration::from_secs(5),
4626                lookback_delta: Duration::from_secs(1),
4627            },
4628            &build_query_engine_state(),
4629        )
4630        .await
4631        .unwrap();
4632        assert_eq!(plan.display_indent_schema().to_string(),
4633        "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4634        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4635        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4636        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4637        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4638        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4639        );
4640        let plan = PromPlanner::stmt_to_plan(
4641            DfTableSourceProvider::new(
4642                catalog_list.clone(),
4643                false,
4644                QueryContext::arc(),
4645                DummyDecoder::arc(),
4646                true,
4647            ),
4648            &EvalStmt {
4649                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4650                start: UNIX_EPOCH,
4651                end: UNIX_EPOCH
4652                    .checked_add(Duration::from_secs(100_000))
4653                    .unwrap(),
4654                interval: Duration::from_secs(5),
4655                lookback_delta: Duration::from_secs(1),
4656            },
4657            &build_query_engine_state(),
4658        )
4659        .await
4660        .unwrap();
4661        assert_eq!(plan.display_indent_schema().to_string(),
4662        "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4663        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4664        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4665        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4666        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4667        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4668        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4669        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4670        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4671        );
4672    }
4673
4674    #[tokio::test]
4675    async fn test_nonexistent_label() {
4676        // template
4677        let mut eval_stmt = EvalStmt {
4678            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4679            start: UNIX_EPOCH,
4680            end: UNIX_EPOCH
4681                .checked_add(Duration::from_secs(100_000))
4682                .unwrap(),
4683            interval: Duration::from_secs(5),
4684            lookback_delta: Duration::from_secs(1),
4685        };
4686
4687        let case = r#"some_metric{nonexistent="hi"}"#;
4688        let prom_expr = parser::parse(case).unwrap();
4689        eval_stmt.expr = prom_expr;
4690        let table_provider = build_test_table_provider(
4691            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4692            3,
4693            3,
4694        )
4695        .await;
4696        // Should be ok
4697        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4698            .await
4699            .unwrap();
4700    }
4701
4702    #[tokio::test]
4703    async fn test_label_join() {
4704        let prom_expr = parser::parse(
4705            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4706        )
4707        .unwrap();
4708        let eval_stmt = EvalStmt {
4709            expr: prom_expr,
4710            start: UNIX_EPOCH,
4711            end: UNIX_EPOCH
4712                .checked_add(Duration::from_secs(100_000))
4713                .unwrap(),
4714            interval: Duration::from_secs(5),
4715            lookback_delta: Duration::from_secs(1),
4716        };
4717
4718        let table_provider =
4719            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4720                .await;
4721        let plan =
4722            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4723                .await
4724                .unwrap();
4725
4726        let expected = r#"
4727Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4728  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4729    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4730      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4731        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4732          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4733            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4734
4735        let ret = plan.display_indent_schema().to_string();
4736        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4737    }
4738
4739    #[tokio::test]
4740    async fn test_label_replace() {
4741        let prom_expr = parser::parse(
4742            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4743        )
4744        .unwrap();
4745        let eval_stmt = EvalStmt {
4746            expr: prom_expr,
4747            start: UNIX_EPOCH,
4748            end: UNIX_EPOCH
4749                .checked_add(Duration::from_secs(100_000))
4750                .unwrap(),
4751            interval: Duration::from_secs(5),
4752            lookback_delta: Duration::from_secs(1),
4753        };
4754
4755        let table_provider =
4756            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4757                .await;
4758        let plan =
4759            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4760                .await
4761                .unwrap();
4762
4763        let expected = r#"
4764Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4765  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4766    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4767      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4768        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4769          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4770            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4771
4772        let ret = plan.display_indent_schema().to_string();
4773        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4774    }
4775
4776    #[tokio::test]
4777    async fn test_matchers_to_expr() {
4778        let mut eval_stmt = EvalStmt {
4779            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4780            start: UNIX_EPOCH,
4781            end: UNIX_EPOCH
4782                .checked_add(Duration::from_secs(100_000))
4783                .unwrap(),
4784            interval: Duration::from_secs(5),
4785            lookback_delta: Duration::from_secs(1),
4786        };
4787        let case =
4788            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4789
4790        let prom_expr = parser::parse(case).unwrap();
4791        eval_stmt.expr = prom_expr;
4792        let table_provider = build_test_table_provider(
4793            &[(
4794                DEFAULT_SCHEMA_NAME.to_string(),
4795                "prometheus_tsdb_head_series".to_string(),
4796            )],
4797            3,
4798            3,
4799        )
4800        .await;
4801        let plan =
4802            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4803                .await
4804                .unwrap();
4805        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4806        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4807        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4808        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4809        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4810        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4811        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4812        assert_eq!(plan.display_indent_schema().to_string(), expected);
4813    }
4814
4815    #[tokio::test]
4816    async fn test_topk_expr() {
4817        let mut eval_stmt = EvalStmt {
4818            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4819            start: UNIX_EPOCH,
4820            end: UNIX_EPOCH
4821                .checked_add(Duration::from_secs(100_000))
4822                .unwrap(),
4823            interval: Duration::from_secs(5),
4824            lookback_delta: Duration::from_secs(1),
4825        };
4826        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4827
4828        let prom_expr = parser::parse(case).unwrap();
4829        eval_stmt.expr = prom_expr;
4830        let table_provider = build_test_table_provider_with_fields(
4831            &[
4832                (
4833                    DEFAULT_SCHEMA_NAME.to_string(),
4834                    "prometheus_tsdb_head_series".to_string(),
4835                ),
4836                (
4837                    DEFAULT_SCHEMA_NAME.to_string(),
4838                    "http_server_requests_seconds_count".to_string(),
4839                ),
4840            ],
4841            &["ip"],
4842        )
4843        .await;
4844
4845        let plan =
4846            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4847                .await
4848                .unwrap();
4849        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4850        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4851        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4852        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4853        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4854        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4855        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4856        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4857        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4858        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4859        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4860
4861        assert_eq!(plan.display_indent_schema().to_string(), expected);
4862    }
4863
4864    #[tokio::test]
4865    async fn test_count_values_expr() {
4866        let mut eval_stmt = EvalStmt {
4867            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4868            start: UNIX_EPOCH,
4869            end: UNIX_EPOCH
4870                .checked_add(Duration::from_secs(100_000))
4871                .unwrap(),
4872            interval: Duration::from_secs(5),
4873            lookback_delta: Duration::from_secs(1),
4874        };
4875        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4876
4877        let prom_expr = parser::parse(case).unwrap();
4878        eval_stmt.expr = prom_expr;
4879        let table_provider = build_test_table_provider_with_fields(
4880            &[
4881                (
4882                    DEFAULT_SCHEMA_NAME.to_string(),
4883                    "prometheus_tsdb_head_series".to_string(),
4884                ),
4885                (
4886                    DEFAULT_SCHEMA_NAME.to_string(),
4887                    "http_server_requests_seconds_count".to_string(),
4888                ),
4889            ],
4890            &["ip"],
4891        )
4892        .await;
4893
4894        let plan =
4895            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4896                .await
4897                .unwrap();
4898        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4899        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4900        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4901        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4902        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4903        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4904        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4905        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4906        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4907
4908        assert_eq!(plan.display_indent_schema().to_string(), expected);
4909    }
4910
4911    #[tokio::test]
4912    async fn test_quantile_expr() {
4913        let mut eval_stmt = EvalStmt {
4914            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4915            start: UNIX_EPOCH,
4916            end: UNIX_EPOCH
4917                .checked_add(Duration::from_secs(100_000))
4918                .unwrap(),
4919            interval: Duration::from_secs(5),
4920            lookback_delta: Duration::from_secs(1),
4921        };
4922        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4923
4924        let prom_expr = parser::parse(case).unwrap();
4925        eval_stmt.expr = prom_expr;
4926        let table_provider = build_test_table_provider_with_fields(
4927            &[
4928                (
4929                    DEFAULT_SCHEMA_NAME.to_string(),
4930                    "prometheus_tsdb_head_series".to_string(),
4931                ),
4932                (
4933                    DEFAULT_SCHEMA_NAME.to_string(),
4934                    "http_server_requests_seconds_count".to_string(),
4935                ),
4936            ],
4937            &["ip"],
4938        )
4939        .await;
4940
4941        let plan =
4942            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4943                .await
4944                .unwrap();
4945        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4946        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4947        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4948        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4949        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4950        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4951        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4952        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4953        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4954
4955        assert_eq!(plan.display_indent_schema().to_string(), expected);
4956    }
4957
4958    #[tokio::test]
4959    async fn test_or_not_exists_table_label() {
4960        let mut eval_stmt = EvalStmt {
4961            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4962            start: UNIX_EPOCH,
4963            end: UNIX_EPOCH
4964                .checked_add(Duration::from_secs(100_000))
4965                .unwrap(),
4966            interval: Duration::from_secs(5),
4967            lookback_delta: Duration::from_secs(1),
4968        };
4969        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4970
4971        let prom_expr = parser::parse(case).unwrap();
4972        eval_stmt.expr = prom_expr;
4973        let table_provider = build_test_table_provider_with_fields(
4974            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4975            &["job"],
4976        )
4977        .await;
4978
4979        let plan =
4980            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4981                .await
4982                .unwrap();
4983        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4984  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4985    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4986      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4987        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4988          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4989            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4990              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4991                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4992                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
4993  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
4994    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
4995      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
4996        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
4997          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4998            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4999
5000        assert_eq!(plan.display_indent_schema().to_string(), expected);
5001    }
5002
5003    #[tokio::test]
5004    async fn test_histogram_quantile_missing_le_column() {
5005        let mut eval_stmt = EvalStmt {
5006            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5007            start: UNIX_EPOCH,
5008            end: UNIX_EPOCH
5009                .checked_add(Duration::from_secs(100_000))
5010                .unwrap(),
5011            interval: Duration::from_secs(5),
5012            lookback_delta: Duration::from_secs(1),
5013        };
5014
5015        // Test case: histogram_quantile with a table that doesn't have 'le' column
5016        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5017
5018        let prom_expr = parser::parse(case).unwrap();
5019        eval_stmt.expr = prom_expr;
5020
5021        // Create a table provider with a table that doesn't have 'le' column
5022        let table_provider = build_test_table_provider_with_fields(
5023            &[(
5024                DEFAULT_SCHEMA_NAME.to_string(),
5025                "non_existent_histogram_bucket".to_string(),
5026            )],
5027            &["pod", "instance"], // Note: no 'le' column
5028        )
5029        .await;
5030
5031        // Should return empty result instead of error
5032        let result =
5033            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5034                .await;
5035
5036        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
5037        assert!(
5038            result.is_ok(),
5039            "Expected successful plan creation with empty result, but got error: {:?}",
5040            result.err()
5041        );
5042
5043        // Verify that the result is an EmptyRelation
5044        let plan = result.unwrap();
5045        match plan {
5046            LogicalPlan::EmptyRelation(_) => {
5047                // This is what we expect
5048            }
5049            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5050        }
5051    }
5052}