query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::GREPTIME_VALUE;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61    Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63    quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71    VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88    ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92/// `time()` function in PromQL.
93const SPECIAL_TIME_FUNCTION: &str = "time";
94/// `scalar()` function in PromQL.
95const SCALAR_FUNCTION: &str = "scalar";
96/// `absent()` function in PromQL
97const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98/// `histogram_quantile` function in PromQL
99const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100/// `vector` function in PromQL
101const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102/// `le` column for conventional histogram.
103const LE_COLUMN_NAME: &str = "le";
104
105/// Static regex for validating label names according to Prometheus specification.
106/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
107static LABEL_NAME_REGEX: Lazy<Regex> =
108    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112/// default value column name for empty metric
113const DEFAULT_FIELD_COLUMN: &str = "value";
114
115/// Special modifier to project field columns under multi-field mode
116const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118/// Special modifier for cross schema query
119const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122/// Threshold for scatter scan mode
123const MAX_SCATTER_POINTS: i64 = 400;
124
125/// Interval 1 hour in millisecond
126const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130    // query parameters
131    start: Millisecond,
132    end: Millisecond,
133    interval: Millisecond,
134    lookback_delta: Millisecond,
135
136    // planner states
137    table_name: Option<String>,
138    time_index_column: Option<String>,
139    field_columns: Vec<String>,
140    tag_columns: Vec<String>,
141    /// The matcher for field columns `__field__`.
142    field_column_matcher: Option<Vec<Matcher>>,
143    /// The matcher for selectors (normal matchers).
144    selector_matcher: Vec<Matcher>,
145    schema_name: Option<String>,
146    /// The range in millisecond of range selector. None if there is no range selector.
147    range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152        Self {
153            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155            interval: stmt.interval.as_millis() as _,
156            lookback_delta: stmt.lookback_delta.as_millis() as _,
157            ..Default::default()
158        }
159    }
160
161    /// Reset all planner states
162    fn reset(&mut self) {
163        self.table_name = None;
164        self.time_index_column = None;
165        self.field_columns = vec![];
166        self.tag_columns = vec![];
167        self.field_column_matcher = None;
168        self.selector_matcher.clear();
169        self.schema_name = None;
170        self.range = None;
171    }
172
173    /// Reset table name and schema to empty
174    fn reset_table_name_and_schema(&mut self) {
175        self.table_name = Some(String::new());
176        self.schema_name = None;
177    }
178
179    /// Check if `le` is present in tag columns
180    fn has_le_tag(&self) -> bool {
181        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182    }
183}
184
185pub struct PromPlanner {
186    table_provider: DfTableSourceProvider,
187    ctx: PromPlannerContext,
188}
189
190/// Unescapes the value of the matcher
191pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
192    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
193        matcher.value = unescaped_value;
194    }
195    matcher
196}
197
198impl PromPlanner {
199    pub async fn stmt_to_plan(
200        table_provider: DfTableSourceProvider,
201        stmt: &EvalStmt,
202        query_engine_state: &QueryEngineState,
203    ) -> Result<LogicalPlan> {
204        let mut planner = Self {
205            table_provider,
206            ctx: PromPlannerContext::from_eval_stmt(stmt),
207        };
208
209        planner
210            .prom_expr_to_plan(&stmt.expr, query_engine_state)
211            .await
212    }
213
214    pub async fn prom_expr_to_plan(
215        &mut self,
216        prom_expr: &PromExpr,
217        query_engine_state: &QueryEngineState,
218    ) -> Result<LogicalPlan> {
219        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
220            .await
221    }
222
223    /**
224    Converts a PromQL expression to a logical plan.
225
226    NOTE:
227        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
228        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
229        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
230        If `false`, the planner generates the standard logical plan for the given PromQL expression.
231    */
232    #[async_recursion]
233    async fn prom_expr_to_plan_inner(
234        &mut self,
235        prom_expr: &PromExpr,
236        timestamp_fn: bool,
237        query_engine_state: &QueryEngineState,
238    ) -> Result<LogicalPlan> {
239        let res = match prom_expr {
240            PromExpr::Aggregate(expr) => {
241                self.prom_aggr_expr_to_plan(query_engine_state, expr)
242                    .await?
243            }
244            PromExpr::Unary(expr) => {
245                self.prom_unary_expr_to_plan(query_engine_state, expr)
246                    .await?
247            }
248            PromExpr::Binary(expr) => {
249                self.prom_binary_expr_to_plan(query_engine_state, expr)
250                    .await?
251            }
252            PromExpr::Paren(ParenExpr { expr }) => {
253                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
254                    .await?
255            }
256            PromExpr::Subquery(expr) => {
257                self.prom_subquery_expr_to_plan(query_engine_state, expr)
258                    .await?
259            }
260            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
261            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
262            PromExpr::VectorSelector(selector) => {
263                self.prom_vector_selector_to_plan(selector, timestamp_fn)
264                    .await?
265            }
266            PromExpr::MatrixSelector(selector) => {
267                self.prom_matrix_selector_to_plan(selector).await?
268            }
269            PromExpr::Call(expr) => {
270                self.prom_call_expr_to_plan(query_engine_state, expr)
271                    .await?
272            }
273            PromExpr::Extension(expr) => {
274                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
275            }
276        };
277
278        Ok(res)
279    }
280
281    async fn prom_subquery_expr_to_plan(
282        &mut self,
283        query_engine_state: &QueryEngineState,
284        subquery_expr: &SubqueryExpr,
285    ) -> Result<LogicalPlan> {
286        let SubqueryExpr {
287            expr, range, step, ..
288        } = subquery_expr;
289
290        let current_interval = self.ctx.interval;
291        if let Some(step) = step {
292            self.ctx.interval = step.as_millis() as _;
293        }
294        let current_start = self.ctx.start;
295        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
296        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
297        self.ctx.interval = current_interval;
298        self.ctx.start = current_start;
299
300        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
301        let range_ms = range.as_millis() as _;
302        self.ctx.range = Some(range_ms);
303
304        let manipulate = RangeManipulate::new(
305            self.ctx.start,
306            self.ctx.end,
307            self.ctx.interval,
308            range_ms,
309            self.ctx
310                .time_index_column
311                .clone()
312                .expect("time index should be set in `setup_context`"),
313            self.ctx.field_columns.clone(),
314            input,
315        )
316        .context(DataFusionPlanningSnafu)?;
317
318        Ok(LogicalPlan::Extension(Extension {
319            node: Arc::new(manipulate),
320        }))
321    }
322
323    async fn prom_aggr_expr_to_plan(
324        &mut self,
325        query_engine_state: &QueryEngineState,
326        aggr_expr: &AggregateExpr,
327    ) -> Result<LogicalPlan> {
328        let AggregateExpr {
329            op,
330            expr,
331            modifier,
332            param,
333        } = aggr_expr;
334
335        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
336
337        match (*op).id() {
338            token::T_TOPK | token::T_BOTTOMK => {
339                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
340            }
341            _ => {
342                // calculate columns to group by
343                // Need to append time index column into group by columns
344                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
345                // convert op and value columns to aggregate exprs
346                let (aggr_exprs, prev_field_exprs) =
347                    self.create_aggregate_exprs(*op, param, &input)?;
348
349                // create plan
350                let builder = LogicalPlanBuilder::from(input);
351                let builder = if op.id() == token::T_COUNT_VALUES {
352                    let label = Self::get_param_value_as_str(*op, param)?;
353                    // `count_values` must be grouped by fields,
354                    // and project the fields to the new label.
355                    group_exprs.extend(prev_field_exprs.clone());
356                    let project_fields = self
357                        .create_field_column_exprs()?
358                        .into_iter()
359                        .chain(self.create_tag_column_exprs()?)
360                        .chain(Some(self.create_time_index_column_expr()?))
361                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
362
363                    builder
364                        .aggregate(group_exprs.clone(), aggr_exprs)
365                        .context(DataFusionPlanningSnafu)?
366                        .project(project_fields)
367                        .context(DataFusionPlanningSnafu)?
368                } else {
369                    builder
370                        .aggregate(group_exprs.clone(), aggr_exprs)
371                        .context(DataFusionPlanningSnafu)?
372                };
373
374                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
375
376                builder
377                    .sort(sort_expr)
378                    .context(DataFusionPlanningSnafu)?
379                    .build()
380                    .context(DataFusionPlanningSnafu)
381            }
382        }
383    }
384
385    /// Create logical plan for PromQL topk and bottomk expr.
386    async fn prom_topk_bottomk_to_plan(
387        &mut self,
388        aggr_expr: &AggregateExpr,
389        input: LogicalPlan,
390    ) -> Result<LogicalPlan> {
391        let AggregateExpr {
392            op,
393            param,
394            modifier,
395            ..
396        } = aggr_expr;
397
398        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
399
400        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
401
402        // convert op and value columns to window exprs.
403        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
404
405        let rank_columns: Vec<_> = window_exprs
406            .iter()
407            .map(|expr| expr.schema_name().to_string())
408            .collect();
409
410        // Create ranks filter with `Operator::Or`.
411        // Safety: at least one rank column
412        let filter: DfExpr = rank_columns
413            .iter()
414            .fold(None, |expr, rank| {
415                let predicate = DfExpr::BinaryExpr(BinaryExpr {
416                    left: Box::new(col(rank)),
417                    op: Operator::LtEq,
418                    right: Box::new(val.clone()),
419                });
420
421                match expr {
422                    None => Some(predicate),
423                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
424                        left: Box::new(expr),
425                        op: Operator::Or,
426                        right: Box::new(predicate),
427                    })),
428                }
429            })
430            .unwrap();
431
432        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
433
434        let mut new_group_exprs = group_exprs.clone();
435        // Order by ranks
436        new_group_exprs.extend(rank_columns);
437
438        let group_sort_expr = new_group_exprs
439            .into_iter()
440            .map(|expr| expr.sort(true, false));
441
442        let project_fields = self
443            .create_field_column_exprs()?
444            .into_iter()
445            .chain(self.create_tag_column_exprs()?)
446            .chain(Some(self.create_time_index_column_expr()?));
447
448        LogicalPlanBuilder::from(input)
449            .window(window_exprs)
450            .context(DataFusionPlanningSnafu)?
451            .filter(filter)
452            .context(DataFusionPlanningSnafu)?
453            .sort(group_sort_expr)
454            .context(DataFusionPlanningSnafu)?
455            .project(project_fields)
456            .context(DataFusionPlanningSnafu)?
457            .build()
458            .context(DataFusionPlanningSnafu)
459    }
460
461    async fn prom_unary_expr_to_plan(
462        &mut self,
463        query_engine_state: &QueryEngineState,
464        unary_expr: &UnaryExpr,
465    ) -> Result<LogicalPlan> {
466        let UnaryExpr { expr } = unary_expr;
467        // Unary Expr in PromQL implys the `-` operator
468        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
469        self.projection_for_each_field_column(input, |col| {
470            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
471        })
472    }
473
474    async fn prom_binary_expr_to_plan(
475        &mut self,
476        query_engine_state: &QueryEngineState,
477        binary_expr: &PromBinaryExpr,
478    ) -> Result<LogicalPlan> {
479        let PromBinaryExpr {
480            lhs,
481            rhs,
482            op,
483            modifier,
484        } = binary_expr;
485
486        // if set to true, comparison operator will return 0/1 (for true/false) instead of
487        // filter on the result column
488        let should_return_bool = if let Some(m) = modifier {
489            m.return_bool
490        } else {
491            false
492        };
493        let is_comparison_op = Self::is_token_a_comparison_op(*op);
494
495        // we should build a filter plan here if the op is comparison op and need not
496        // to return 0/1. Otherwise, we should build a projection plan
497        match (
498            Self::try_build_literal_expr(lhs),
499            Self::try_build_literal_expr(rhs),
500        ) {
501            (Some(lhs), Some(rhs)) => {
502                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
503                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
504                self.ctx.reset_table_name_and_schema();
505                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
506                let mut field_expr = field_expr_builder(lhs, rhs)?;
507
508                if is_comparison_op && should_return_bool {
509                    field_expr = DfExpr::Cast(Cast {
510                        expr: Box::new(field_expr),
511                        data_type: ArrowDataType::Float64,
512                    });
513                }
514
515                Ok(LogicalPlan::Extension(Extension {
516                    node: Arc::new(
517                        EmptyMetric::new(
518                            self.ctx.start,
519                            self.ctx.end,
520                            self.ctx.interval,
521                            SPECIAL_TIME_FUNCTION.to_string(),
522                            DEFAULT_FIELD_COLUMN.to_string(),
523                            Some(field_expr),
524                        )
525                        .context(DataFusionPlanningSnafu)?,
526                    ),
527                }))
528            }
529            // lhs is a literal, rhs is a column
530            (Some(mut expr), None) => {
531                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
532                // check if the literal is a special time expr
533                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
534                    expr = time_expr
535                }
536                let bin_expr_builder = |col: &String| {
537                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
538                    let mut binary_expr =
539                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
540
541                    if is_comparison_op && should_return_bool {
542                        binary_expr = DfExpr::Cast(Cast {
543                            expr: Box::new(binary_expr),
544                            data_type: ArrowDataType::Float64,
545                        });
546                    }
547                    Ok(binary_expr)
548                };
549                if is_comparison_op && !should_return_bool {
550                    self.filter_on_field_column(input, bin_expr_builder)
551                } else {
552                    self.projection_for_each_field_column(input, bin_expr_builder)
553                }
554            }
555            // lhs is a column, rhs is a literal
556            (None, Some(mut expr)) => {
557                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
558                // check if the literal is a special time expr
559                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
560                    expr = time_expr
561                }
562                let bin_expr_builder = |col: &String| {
563                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
564                    let mut binary_expr =
565                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
566
567                    if is_comparison_op && should_return_bool {
568                        binary_expr = DfExpr::Cast(Cast {
569                            expr: Box::new(binary_expr),
570                            data_type: ArrowDataType::Float64,
571                        });
572                    }
573                    Ok(binary_expr)
574                };
575                if is_comparison_op && !should_return_bool {
576                    self.filter_on_field_column(input, bin_expr_builder)
577                } else {
578                    self.projection_for_each_field_column(input, bin_expr_builder)
579                }
580            }
581            // both are columns. join them on time index
582            (None, None) => {
583                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
584                let left_field_columns = self.ctx.field_columns.clone();
585                let left_time_index_column = self.ctx.time_index_column.clone();
586                let mut left_table_ref = self
587                    .table_ref()
588                    .unwrap_or_else(|_| TableReference::bare(""));
589                let left_context = self.ctx.clone();
590
591                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
592                let right_field_columns = self.ctx.field_columns.clone();
593                let right_time_index_column = self.ctx.time_index_column.clone();
594                let mut right_table_ref = self
595                    .table_ref()
596                    .unwrap_or_else(|_| TableReference::bare(""));
597                let right_context = self.ctx.clone();
598
599                // TODO(ruihang): avoid join if left and right are the same table
600
601                // set op has "special" join semantics
602                if Self::is_token_a_set_op(*op) {
603                    return self.set_op_on_non_field_columns(
604                        left_input,
605                        right_input,
606                        left_context,
607                        right_context,
608                        *op,
609                        modifier,
610                    );
611                }
612
613                // normal join
614                if left_table_ref == right_table_ref {
615                    // rename table references to avoid ambiguity
616                    left_table_ref = TableReference::bare("lhs");
617                    right_table_ref = TableReference::bare("rhs");
618                    // `self.ctx` have ctx in right plan, if right plan have no tag,
619                    // we use left plan ctx as the ctx for subsequent calculations,
620                    // to avoid case like `host + scalar(...)`
621                    // we need preserve tag column on `host` table in subsequent projection,
622                    // which only show in left plan ctx.
623                    if self.ctx.tag_columns.is_empty() {
624                        self.ctx = left_context.clone();
625                        self.ctx.table_name = Some("lhs".to_string());
626                    } else {
627                        self.ctx.table_name = Some("rhs".to_string());
628                    }
629                }
630                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
631
632                let join_plan = self.join_on_non_field_columns(
633                    left_input,
634                    right_input,
635                    left_table_ref.clone(),
636                    right_table_ref.clone(),
637                    left_time_index_column,
638                    right_time_index_column,
639                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
640                    // under this case we only join on time index
641                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
642                    modifier,
643                )?;
644                let join_plan_schema = join_plan.schema().clone();
645
646                let bin_expr_builder = |_: &String| {
647                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
648                    let left_col = join_plan_schema
649                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
650                        .context(DataFusionPlanningSnafu)?
651                        .into();
652                    let right_col = join_plan_schema
653                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
654                        .context(DataFusionPlanningSnafu)?
655                        .into();
656
657                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
658                    let mut binary_expr =
659                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
660                    if is_comparison_op && should_return_bool {
661                        binary_expr = DfExpr::Cast(Cast {
662                            expr: Box::new(binary_expr),
663                            data_type: ArrowDataType::Float64,
664                        });
665                    }
666                    Ok(binary_expr)
667                };
668                if is_comparison_op && !should_return_bool {
669                    self.filter_on_field_column(join_plan, bin_expr_builder)
670                } else {
671                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
672                }
673            }
674        }
675    }
676
677    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
678        let NumberLiteral { val } = number_literal;
679        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
680        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
681        self.ctx.reset_table_name_and_schema();
682        let literal_expr = df_prelude::lit(*val);
683
684        let plan = LogicalPlan::Extension(Extension {
685            node: Arc::new(
686                EmptyMetric::new(
687                    self.ctx.start,
688                    self.ctx.end,
689                    self.ctx.interval,
690                    SPECIAL_TIME_FUNCTION.to_string(),
691                    DEFAULT_FIELD_COLUMN.to_string(),
692                    Some(literal_expr),
693                )
694                .context(DataFusionPlanningSnafu)?,
695            ),
696        });
697        Ok(plan)
698    }
699
700    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
701        let StringLiteral { val } = string_literal;
702        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
703        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
704        self.ctx.reset_table_name_and_schema();
705        let literal_expr = df_prelude::lit(val.to_string());
706
707        let plan = LogicalPlan::Extension(Extension {
708            node: Arc::new(
709                EmptyMetric::new(
710                    self.ctx.start,
711                    self.ctx.end,
712                    self.ctx.interval,
713                    SPECIAL_TIME_FUNCTION.to_string(),
714                    DEFAULT_FIELD_COLUMN.to_string(),
715                    Some(literal_expr),
716                )
717                .context(DataFusionPlanningSnafu)?,
718            ),
719        });
720        Ok(plan)
721    }
722
723    async fn prom_vector_selector_to_plan(
724        &mut self,
725        vector_selector: &VectorSelector,
726        timestamp_fn: bool,
727    ) -> Result<LogicalPlan> {
728        let VectorSelector {
729            name,
730            offset,
731            matchers,
732            at: _,
733        } = vector_selector;
734        let matchers = self.preprocess_label_matchers(matchers, name)?;
735        if let Some(empty_plan) = self.setup_context().await? {
736            return Ok(empty_plan);
737        }
738        let normalize = self
739            .selector_to_series_normalize_plan(offset, matchers, false)
740            .await?;
741
742        let normalize = if timestamp_fn {
743            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
744            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
745            self.create_timestamp_func_plan(normalize)?
746        } else {
747            normalize
748        };
749
750        let manipulate = InstantManipulate::new(
751            self.ctx.start,
752            self.ctx.end,
753            self.ctx.lookback_delta,
754            self.ctx.interval,
755            self.ctx
756                .time_index_column
757                .clone()
758                .expect("time index should be set in `setup_context`"),
759            self.ctx.field_columns.first().cloned(),
760            normalize,
761        );
762        Ok(LogicalPlan::Extension(Extension {
763            node: Arc::new(manipulate),
764        }))
765    }
766
767    /// Builds a projection plan for the PromQL `timestamp()` function.
768    /// Projects the time index column as the value column for each row.
769    ///
770    /// # Arguments
771    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
772    ///
773    /// # Returns
774    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
775    /// column as the value column, along with the original tag and time index columns.
776    ///
777    /// # Timestamp vs. Time Function
778    ///
779    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
780    ///   timestamp (time index) of each sample as the value column.
781    ///
782    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
783    ///   as a scalar value.
784    ///
785    /// # Side Effects
786    /// Updates the planner context's field columns to the timestamp column name.
787    ///
788    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
789        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
790            .alias(DEFAULT_FIELD_COLUMN);
791        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
792        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
793        project_exprs.push(self.create_time_index_column_expr()?);
794        project_exprs.push(time_expr);
795        project_exprs.extend(self.create_tag_column_exprs()?);
796
797        LogicalPlanBuilder::from(normalize)
798            .project(project_exprs)
799            .context(DataFusionPlanningSnafu)?
800            .build()
801            .context(DataFusionPlanningSnafu)
802    }
803
804    async fn prom_matrix_selector_to_plan(
805        &mut self,
806        matrix_selector: &MatrixSelector,
807    ) -> Result<LogicalPlan> {
808        let MatrixSelector { vs, range } = matrix_selector;
809        let VectorSelector {
810            name,
811            offset,
812            matchers,
813            ..
814        } = vs;
815        let matchers = self.preprocess_label_matchers(matchers, name)?;
816        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
817        let range_ms = range.as_millis() as _;
818        self.ctx.range = Some(range_ms);
819
820        // Some functions like rate may require special fields in the RangeManipulate plan
821        // so we can't skip RangeManipulate.
822        let normalize = match self.setup_context().await? {
823            Some(empty_plan) => empty_plan,
824            None => {
825                self.selector_to_series_normalize_plan(offset, matchers, true)
826                    .await?
827            }
828        };
829        let manipulate = RangeManipulate::new(
830            self.ctx.start,
831            self.ctx.end,
832            self.ctx.interval,
833            // TODO(ruihang): convert via Timestamp datatypes to support different time units
834            range_ms,
835            self.ctx
836                .time_index_column
837                .clone()
838                .expect("time index should be set in `setup_context`"),
839            self.ctx.field_columns.clone(),
840            normalize,
841        )
842        .context(DataFusionPlanningSnafu)?;
843
844        Ok(LogicalPlan::Extension(Extension {
845            node: Arc::new(manipulate),
846        }))
847    }
848
849    async fn prom_call_expr_to_plan(
850        &mut self,
851        query_engine_state: &QueryEngineState,
852        call_expr: &Call,
853    ) -> Result<LogicalPlan> {
854        let Call { func, args } = call_expr;
855        // some special functions that are not expression but a plan
856        match func.name {
857            SPECIAL_HISTOGRAM_QUANTILE => {
858                return self.create_histogram_plan(args, query_engine_state).await;
859            }
860            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
861            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
862            SPECIAL_ABSENT_FUNCTION => {
863                return self.create_absent_plan(args, query_engine_state).await;
864            }
865            _ => {}
866        }
867
868        // transform function arguments
869        let args = self.create_function_args(&args.args)?;
870        let input = if let Some(prom_expr) = &args.input {
871            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
872                .await?
873        } else {
874            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
875            self.ctx.reset_table_name_and_schema();
876            self.ctx.tag_columns = vec![];
877            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
878            LogicalPlan::Extension(Extension {
879                node: Arc::new(
880                    EmptyMetric::new(
881                        self.ctx.start,
882                        self.ctx.end,
883                        self.ctx.interval,
884                        SPECIAL_TIME_FUNCTION.to_string(),
885                        DEFAULT_FIELD_COLUMN.to_string(),
886                        None,
887                    )
888                    .context(DataFusionPlanningSnafu)?,
889                ),
890            })
891        };
892        let (mut func_exprs, new_tags) =
893            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
894        func_exprs.insert(0, self.create_time_index_column_expr()?);
895        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
896
897        let builder = LogicalPlanBuilder::from(input)
898            .project(func_exprs)
899            .context(DataFusionPlanningSnafu)?
900            .filter(self.create_empty_values_filter_expr()?)
901            .context(DataFusionPlanningSnafu)?;
902
903        let builder = match func.name {
904            "sort" => builder
905                .sort(self.create_field_columns_sort_exprs(true))
906                .context(DataFusionPlanningSnafu)?,
907            "sort_desc" => builder
908                .sort(self.create_field_columns_sort_exprs(false))
909                .context(DataFusionPlanningSnafu)?,
910            "sort_by_label" => builder
911                .sort(Self::create_sort_exprs_by_tags(
912                    func.name,
913                    args.literals,
914                    true,
915                )?)
916                .context(DataFusionPlanningSnafu)?,
917            "sort_by_label_desc" => builder
918                .sort(Self::create_sort_exprs_by_tags(
919                    func.name,
920                    args.literals,
921                    false,
922                )?)
923                .context(DataFusionPlanningSnafu)?,
924
925            _ => builder,
926        };
927
928        // Update context tags after building plan
929        // We can't push them before planning, because they won't exist until projection.
930        for tag in new_tags {
931            self.ctx.tag_columns.push(tag);
932        }
933
934        let plan = builder.build().context(DataFusionPlanningSnafu)?;
935        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
936
937        Ok(plan)
938    }
939
940    async fn prom_ext_expr_to_plan(
941        &mut self,
942        query_engine_state: &QueryEngineState,
943        ext_expr: &promql_parser::parser::ast::Extension,
944    ) -> Result<LogicalPlan> {
945        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
946        let expr = &ext_expr.expr;
947        let children = expr.children();
948        let plan = self
949            .prom_expr_to_plan(&children[0], query_engine_state)
950            .await?;
951        // Wrapper for the explanation/analyze of the existing plan
952        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
953        // if `analyze` is true, runs the actual plan and produces
954        // information about metrics during run.
955        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
956        match expr.name() {
957            "ANALYZE" => LogicalPlanBuilder::from(plan)
958                .explain(false, true)
959                .unwrap()
960                .build()
961                .context(DataFusionPlanningSnafu),
962            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
963                .explain(true, true)
964                .unwrap()
965                .build()
966                .context(DataFusionPlanningSnafu),
967            "EXPLAIN" => LogicalPlanBuilder::from(plan)
968                .explain(false, false)
969                .unwrap()
970                .build()
971                .context(DataFusionPlanningSnafu),
972            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
973                .explain(true, false)
974                .unwrap()
975                .build()
976                .context(DataFusionPlanningSnafu),
977            _ => LogicalPlanBuilder::empty(true)
978                .build()
979                .context(DataFusionPlanningSnafu),
980        }
981    }
982
983    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
984    /// Returns a new [Matchers] that doesn't contain metric name matcher.
985    ///
986    /// Each call to this function means new selector is started. Thus, the context will be reset
987    /// at first.
988    ///
989    /// Name rule:
990    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
991    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
992    #[allow(clippy::mutable_key_type)]
993    fn preprocess_label_matchers(
994        &mut self,
995        label_matchers: &Matchers,
996        name: &Option<String>,
997    ) -> Result<Matchers> {
998        self.ctx.reset();
999
1000        let metric_name;
1001        if let Some(name) = name.clone() {
1002            metric_name = Some(name);
1003            ensure!(
1004                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1005                MultipleMetricMatchersSnafu
1006            );
1007        } else {
1008            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1009            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1010            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1011            ensure!(
1012                matches[0].op == MatchOp::Equal,
1013                UnsupportedMatcherOpSnafu {
1014                    matcher_op: matches[0].op.to_string(),
1015                    matcher: METRIC_NAME
1016                }
1017            );
1018            metric_name = matches.pop().map(|m| m.value);
1019        }
1020
1021        self.ctx.table_name = metric_name;
1022
1023        let mut matchers = HashSet::new();
1024        for matcher in &label_matchers.matchers {
1025            // TODO(ruihang): support other metric match ops
1026            if matcher.name == FIELD_COLUMN_MATCHER {
1027                self.ctx
1028                    .field_column_matcher
1029                    .get_or_insert_default()
1030                    .push(matcher.clone());
1031            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1032                ensure!(
1033                    matcher.op == MatchOp::Equal,
1034                    UnsupportedMatcherOpSnafu {
1035                        matcher: matcher.name.to_string(),
1036                        matcher_op: matcher.op.to_string(),
1037                    }
1038                );
1039                self.ctx.schema_name = Some(matcher.value.clone());
1040            } else if matcher.name != METRIC_NAME {
1041                self.ctx.selector_matcher.push(matcher.clone());
1042                let _ = matchers.insert(matcher.clone());
1043            }
1044        }
1045
1046        Ok(Matchers::new(
1047            matchers.into_iter().map(normalize_matcher).collect(),
1048        ))
1049    }
1050
1051    async fn selector_to_series_normalize_plan(
1052        &mut self,
1053        offset: &Option<Offset>,
1054        label_matchers: Matchers,
1055        is_range_selector: bool,
1056    ) -> Result<LogicalPlan> {
1057        // make table scan plan
1058        let table_ref = self.table_ref()?;
1059        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1060        let table_schema = table_scan.schema();
1061
1062        // make filter exprs
1063        let offset_duration = match offset {
1064            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1065            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1066            None => 0,
1067        };
1068        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1069        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1070            scan_filters.push(time_index_filter);
1071        }
1072        table_scan = LogicalPlanBuilder::from(table_scan)
1073            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1074            .context(DataFusionPlanningSnafu)?
1075            .build()
1076            .context(DataFusionPlanningSnafu)?;
1077
1078        // make a projection plan if there is any `__field__` matcher
1079        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1080            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1081            // opt-in set
1082            let mut result_set = HashSet::new();
1083            // opt-out set
1084            let mut reverse_set = HashSet::new();
1085            for matcher in field_matchers {
1086                match &matcher.op {
1087                    MatchOp::Equal => {
1088                        if col_set.contains(&matcher.value) {
1089                            let _ = result_set.insert(matcher.value.clone());
1090                        } else {
1091                            return Err(ColumnNotFoundSnafu {
1092                                col: matcher.value.clone(),
1093                            }
1094                            .build());
1095                        }
1096                    }
1097                    MatchOp::NotEqual => {
1098                        if col_set.contains(&matcher.value) {
1099                            let _ = reverse_set.insert(matcher.value.clone());
1100                        } else {
1101                            return Err(ColumnNotFoundSnafu {
1102                                col: matcher.value.clone(),
1103                            }
1104                            .build());
1105                        }
1106                    }
1107                    MatchOp::Re(regex) => {
1108                        for col in &self.ctx.field_columns {
1109                            if regex.is_match(col) {
1110                                let _ = result_set.insert(col.clone());
1111                            }
1112                        }
1113                    }
1114                    MatchOp::NotRe(regex) => {
1115                        for col in &self.ctx.field_columns {
1116                            if regex.is_match(col) {
1117                                let _ = reverse_set.insert(col.clone());
1118                            }
1119                        }
1120                    }
1121                }
1122            }
1123            // merge two set
1124            if result_set.is_empty() {
1125                result_set = col_set.into_iter().cloned().collect();
1126            }
1127            for col in reverse_set {
1128                let _ = result_set.remove(&col);
1129            }
1130
1131            // mask the field columns in context using computed result set
1132            self.ctx.field_columns = self
1133                .ctx
1134                .field_columns
1135                .drain(..)
1136                .filter(|col| result_set.contains(col))
1137                .collect();
1138
1139            let exprs = result_set
1140                .into_iter()
1141                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1142                .chain(self.create_tag_column_exprs()?)
1143                .chain(Some(self.create_time_index_column_expr()?))
1144                .collect::<Vec<_>>();
1145
1146            // reuse this variable for simplicity
1147            table_scan = LogicalPlanBuilder::from(table_scan)
1148                .project(exprs)
1149                .context(DataFusionPlanningSnafu)?
1150                .build()
1151                .context(DataFusionPlanningSnafu)?;
1152        }
1153
1154        // make sort plan
1155        let sort_plan = LogicalPlanBuilder::from(table_scan)
1156            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1157            .context(DataFusionPlanningSnafu)?
1158            .build()
1159            .context(DataFusionPlanningSnafu)?;
1160
1161        // make divide plan
1162        let time_index_column =
1163            self.ctx
1164                .time_index_column
1165                .clone()
1166                .with_context(|| TimeIndexNotFoundSnafu {
1167                    table: table_ref.to_string(),
1168                })?;
1169        let divide_plan = LogicalPlan::Extension(Extension {
1170            node: Arc::new(SeriesDivide::new(
1171                self.ctx.tag_columns.clone(),
1172                time_index_column,
1173                sort_plan,
1174            )),
1175        });
1176
1177        // make series_normalize plan
1178        if !is_range_selector && offset_duration == 0 {
1179            return Ok(divide_plan);
1180        }
1181        let series_normalize = SeriesNormalize::new(
1182            offset_duration,
1183            self.ctx
1184                .time_index_column
1185                .clone()
1186                .with_context(|| TimeIndexNotFoundSnafu {
1187                    table: table_ref.to_quoted_string(),
1188                })?,
1189            is_range_selector,
1190            self.ctx.tag_columns.clone(),
1191            divide_plan,
1192        );
1193        let logical_plan = LogicalPlan::Extension(Extension {
1194            node: Arc::new(series_normalize),
1195        });
1196
1197        Ok(logical_plan)
1198    }
1199
1200    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1201    /// Timestamp column and tag columns will be included.
1202    ///
1203    /// # Side effect
1204    ///
1205    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1206    fn agg_modifier_to_col(
1207        &mut self,
1208        input_schema: &DFSchemaRef,
1209        modifier: &Option<LabelModifier>,
1210        update_ctx: bool,
1211    ) -> Result<Vec<DfExpr>> {
1212        match modifier {
1213            None => {
1214                if update_ctx {
1215                    self.ctx.tag_columns.clear();
1216                }
1217                Ok(vec![self.create_time_index_column_expr()?])
1218            }
1219            Some(LabelModifier::Include(labels)) => {
1220                if update_ctx {
1221                    self.ctx.tag_columns.clear();
1222                }
1223                let mut exprs = Vec::with_capacity(labels.labels.len());
1224                for label in &labels.labels {
1225                    // nonexistence label will be ignored
1226                    if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1227                        exprs.push(DfExpr::Column(Column::from(field.name())));
1228
1229                        if update_ctx {
1230                            // update the tag columns in context
1231                            self.ctx.tag_columns.push(label.clone());
1232                        }
1233                    }
1234                }
1235                // add timestamp column
1236                exprs.push(self.create_time_index_column_expr()?);
1237
1238                Ok(exprs)
1239            }
1240            Some(LabelModifier::Exclude(labels)) => {
1241                let mut all_fields = input_schema
1242                    .fields()
1243                    .iter()
1244                    .map(|f| f.name())
1245                    .collect::<BTreeSet<_>>();
1246
1247                // remove "without"-ed fields
1248                // nonexistence label will be ignored
1249                for label in &labels.labels {
1250                    let _ = all_fields.remove(label);
1251                }
1252
1253                // remove time index and value fields
1254                if let Some(time_index) = &self.ctx.time_index_column {
1255                    let _ = all_fields.remove(time_index);
1256                }
1257                for value in &self.ctx.field_columns {
1258                    let _ = all_fields.remove(value);
1259                }
1260
1261                if update_ctx {
1262                    // change the tag columns in context
1263                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1264                }
1265
1266                // collect remaining fields and convert to col expr
1267                let mut exprs = all_fields
1268                    .into_iter()
1269                    .map(|c| DfExpr::Column(Column::from(c)))
1270                    .collect::<Vec<_>>();
1271
1272                // add timestamp column
1273                exprs.push(self.create_time_index_column_expr()?);
1274
1275                Ok(exprs)
1276            }
1277        }
1278    }
1279
1280    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1281    pub fn matchers_to_expr(
1282        label_matchers: Matchers,
1283        table_schema: &DFSchemaRef,
1284    ) -> Result<Vec<DfExpr>> {
1285        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1286        for matcher in label_matchers.matchers {
1287            if matcher.name == SCHEMA_COLUMN_MATCHER
1288                || matcher.name == DB_COLUMN_MATCHER
1289                || matcher.name == FIELD_COLUMN_MATCHER
1290            {
1291                continue;
1292            }
1293
1294            let col = if table_schema
1295                .field_with_unqualified_name(&matcher.name)
1296                .is_err()
1297            {
1298                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None).alias(matcher.name)
1299            } else {
1300                DfExpr::Column(Column::from_name(matcher.name))
1301            };
1302            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1303            let expr = match matcher.op {
1304                MatchOp::Equal => col.eq(lit),
1305                MatchOp::NotEqual => col.not_eq(lit),
1306                MatchOp::Re(re) => {
1307                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1308
1309                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1310                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1311                    // most of the time it's fine.
1312                    if re.as_str() == "^(?:.*)$" {
1313                        continue;
1314                    }
1315                    if re.as_str() == "^(?:.+)$" {
1316                        col.not_eq(DfExpr::Literal(
1317                            ScalarValue::Utf8(Some(String::new())),
1318                            None,
1319                        ))
1320                    } else {
1321                        DfExpr::BinaryExpr(BinaryExpr {
1322                            left: Box::new(col),
1323                            op: Operator::RegexMatch,
1324                            right: Box::new(DfExpr::Literal(
1325                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1326                                None,
1327                            )),
1328                        })
1329                    }
1330                }
1331                MatchOp::NotRe(re) => {
1332                    if re.as_str() == "^(?:.*)$" {
1333                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1334                    } else if re.as_str() == "^(?:.+)$" {
1335                        col.eq(DfExpr::Literal(
1336                            ScalarValue::Utf8(Some(String::new())),
1337                            None,
1338                        ))
1339                    } else {
1340                        DfExpr::BinaryExpr(BinaryExpr {
1341                            left: Box::new(col),
1342                            op: Operator::RegexNotMatch,
1343                            right: Box::new(DfExpr::Literal(
1344                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1345                                None,
1346                            )),
1347                        })
1348                    }
1349                }
1350            };
1351            exprs.push(expr);
1352        }
1353
1354        Ok(exprs)
1355    }
1356
1357    fn table_ref(&self) -> Result<TableReference> {
1358        let table_name = self
1359            .ctx
1360            .table_name
1361            .clone()
1362            .context(TableNameNotFoundSnafu)?;
1363
1364        // set schema name if `__schema__` is given
1365        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1366            TableReference::partial(schema_name.as_str(), table_name.as_str())
1367        } else {
1368            TableReference::bare(table_name.as_str())
1369        };
1370
1371        Ok(table_ref)
1372    }
1373
1374    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1375        let start = self.ctx.start;
1376        let end = self.ctx.end;
1377        if end < start {
1378            return InvalidTimeRangeSnafu { start, end }.fail();
1379        }
1380        let lookback_delta = self.ctx.lookback_delta;
1381        let range = self.ctx.range.unwrap_or_default();
1382        let interval = self.ctx.interval;
1383        let time_index_expr = self.create_time_index_column_expr()?;
1384        let num_points = (end - start) / interval;
1385
1386        // Scan a continuous time range
1387        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1388            let single_time_range = time_index_expr
1389                .clone()
1390                .gt_eq(DfExpr::Literal(
1391                    ScalarValue::TimestampMillisecond(
1392                        Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
1393                        None,
1394                    ),
1395                    None,
1396                ))
1397                .and(time_index_expr.lt_eq(DfExpr::Literal(
1398                    ScalarValue::TimestampMillisecond(
1399                        Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
1400                        None,
1401                    ),
1402                    None,
1403                )));
1404            return Ok(Some(single_time_range));
1405        }
1406
1407        // Otherwise scan scatter ranges separately
1408        let mut filters = Vec::with_capacity(num_points as usize);
1409        for timestamp in (start..end).step_by(interval as usize) {
1410            filters.push(
1411                time_index_expr
1412                    .clone()
1413                    .gt_eq(DfExpr::Literal(
1414                        ScalarValue::TimestampMillisecond(
1415                            Some(timestamp + offset_duration - lookback_delta - range),
1416                            None,
1417                        ),
1418                        None,
1419                    ))
1420                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1421                        ScalarValue::TimestampMillisecond(
1422                            Some(timestamp + offset_duration + lookback_delta),
1423                            None,
1424                        ),
1425                        None,
1426                    ))),
1427            )
1428        }
1429
1430        Ok(filters.into_iter().reduce(DfExpr::or))
1431    }
1432
1433    /// Create a table scan plan and a filter plan with given filter.
1434    ///
1435    /// # Panic
1436    /// If the filter is empty
1437    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1438        let provider = self
1439            .table_provider
1440            .resolve_table(table_ref.clone())
1441            .await
1442            .context(CatalogSnafu)?;
1443
1444        let is_time_index_ms = provider
1445            .as_any()
1446            .downcast_ref::<DefaultTableSource>()
1447            .context(UnknownTableSnafu)?
1448            .table_provider
1449            .as_any()
1450            .downcast_ref::<DfTableProviderAdapter>()
1451            .context(UnknownTableSnafu)?
1452            .table()
1453            .schema()
1454            .timestamp_column()
1455            .with_context(|| TimeIndexNotFoundSnafu {
1456                table: table_ref.to_quoted_string(),
1457            })?
1458            .data_type
1459            == ConcreteDataType::timestamp_millisecond_datatype();
1460
1461        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1462            .context(DataFusionPlanningSnafu)?
1463            .build()
1464            .context(DataFusionPlanningSnafu)?;
1465
1466        if !is_time_index_ms {
1467            // cast to ms if time_index not in Millisecond precision
1468            let expr: Vec<_> = self
1469                .ctx
1470                .field_columns
1471                .iter()
1472                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1473                .chain(self.create_tag_column_exprs()?)
1474                .chain(Some(DfExpr::Alias(Alias {
1475                    expr: Box::new(DfExpr::Cast(Cast {
1476                        expr: Box::new(self.create_time_index_column_expr()?),
1477                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1478                    })),
1479                    relation: Some(table_ref.clone()),
1480                    name: self
1481                        .ctx
1482                        .time_index_column
1483                        .as_ref()
1484                        .with_context(|| TimeIndexNotFoundSnafu {
1485                            table: table_ref.to_quoted_string(),
1486                        })?
1487                        .clone(),
1488                    metadata: None,
1489                })))
1490                .collect::<Vec<_>>();
1491            scan_plan = LogicalPlanBuilder::from(scan_plan)
1492                .project(expr)
1493                .context(DataFusionPlanningSnafu)?
1494                .build()
1495                .context(DataFusionPlanningSnafu)?;
1496        }
1497
1498        let result = LogicalPlanBuilder::from(scan_plan)
1499            .build()
1500            .context(DataFusionPlanningSnafu)?;
1501        Ok(result)
1502    }
1503
1504    /// Setup [PromPlannerContext]'s state fields.
1505    ///
1506    /// Returns a logical plan for an empty metric.
1507    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1508        let table_ref = self.table_ref()?;
1509        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1510            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1511                let plan = self.setup_context_for_empty_metric()?;
1512                return Ok(Some(plan));
1513            }
1514            res => res.context(CatalogSnafu)?,
1515        };
1516        let table = table
1517            .as_any()
1518            .downcast_ref::<DefaultTableSource>()
1519            .context(UnknownTableSnafu)?
1520            .table_provider
1521            .as_any()
1522            .downcast_ref::<DfTableProviderAdapter>()
1523            .context(UnknownTableSnafu)?
1524            .table();
1525
1526        // set time index column name
1527        let time_index = table
1528            .schema()
1529            .timestamp_column()
1530            .with_context(|| TimeIndexNotFoundSnafu {
1531                table: table_ref.to_quoted_string(),
1532            })?
1533            .name
1534            .clone();
1535        self.ctx.time_index_column = Some(time_index);
1536
1537        // set values columns
1538        let values = table
1539            .table_info()
1540            .meta
1541            .field_column_names()
1542            .cloned()
1543            .collect();
1544        self.ctx.field_columns = values;
1545
1546        // set primary key (tag) columns
1547        let tags = table
1548            .table_info()
1549            .meta
1550            .row_key_column_names()
1551            .filter(|col| {
1552                // remove metric engine's internal columns
1553                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1554            })
1555            .cloned()
1556            .collect();
1557        self.ctx.tag_columns = tags;
1558
1559        Ok(None)
1560    }
1561
1562    /// Setup [PromPlannerContext]'s state fields for a non existent table
1563    /// without any rows.
1564    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1565        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1566        self.ctx.reset_table_name_and_schema();
1567        self.ctx.tag_columns = vec![];
1568        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1569
1570        // The table doesn't have any data, so we set start to 0 and end to -1.
1571        let plan = LogicalPlan::Extension(Extension {
1572            node: Arc::new(
1573                EmptyMetric::new(
1574                    0,
1575                    -1,
1576                    self.ctx.interval,
1577                    SPECIAL_TIME_FUNCTION.to_string(),
1578                    DEFAULT_FIELD_COLUMN.to_string(),
1579                    Some(lit(0.0f64)),
1580                )
1581                .context(DataFusionPlanningSnafu)?,
1582            ),
1583        });
1584        Ok(plan)
1585    }
1586
1587    // TODO(ruihang): insert column expr
1588    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1589        let mut result = FunctionArgs::default();
1590
1591        for arg in args {
1592            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
1593            if let Some(expr) = Self::try_build_literal_expr(arg) {
1594                result.literals.push(expr);
1595            } else {
1596                // If not a literal, treat as vector input
1597                match arg.as_ref() {
1598                    PromExpr::Subquery(_)
1599                    | PromExpr::VectorSelector(_)
1600                    | PromExpr::MatrixSelector(_)
1601                    | PromExpr::Extension(_)
1602                    | PromExpr::Aggregate(_)
1603                    | PromExpr::Paren(_)
1604                    | PromExpr::Call(_)
1605                    | PromExpr::Binary(_)
1606                    | PromExpr::Unary(_) => {
1607                        if result.input.replace(*arg.clone()).is_some() {
1608                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1609                        }
1610                    }
1611
1612                    _ => {
1613                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1614                        result.literals.push(expr);
1615                    }
1616                }
1617            }
1618        }
1619
1620        Ok(result)
1621    }
1622
1623    /// Creates function expressions for projection and returns the expressions and new tags.
1624    ///
1625    /// # Side Effects
1626    ///
1627    /// This method will update [PromPlannerContext]'s fields and tags if needed.
1628    fn create_function_expr(
1629        &mut self,
1630        func: &Function,
1631        other_input_exprs: Vec<DfExpr>,
1632        query_engine_state: &QueryEngineState,
1633    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1634        // TODO(ruihang): check function args list
1635        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1636
1637        // TODO(ruihang): set this according to in-param list
1638        let field_column_pos = 0;
1639        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1640        // New labels after executing the function, e.g. `label_replace` etc.
1641        let mut new_tags = vec![];
1642        let scalar_func = match func.name {
1643            "increase" => ScalarFunc::ExtrapolateUdf(
1644                Arc::new(Increase::scalar_udf()),
1645                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1646            ),
1647            "rate" => ScalarFunc::ExtrapolateUdf(
1648                Arc::new(Rate::scalar_udf()),
1649                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1650            ),
1651            "delta" => ScalarFunc::ExtrapolateUdf(
1652                Arc::new(Delta::scalar_udf()),
1653                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1654            ),
1655            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1656            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1657            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1658            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1659            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1660            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1661            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1662            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1663            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1664            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1665            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1666            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1667            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1668            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1669            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1670            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1671            "predict_linear" => {
1672                other_input_exprs[0] = DfExpr::Cast(Cast {
1673                    expr: Box::new(other_input_exprs[0].clone()),
1674                    data_type: ArrowDataType::Int64,
1675                });
1676                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1677            }
1678            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1679            "time" => {
1680                exprs.push(build_special_time_expr(
1681                    self.ctx.time_index_column.as_ref().unwrap(),
1682                ));
1683                ScalarFunc::GeneratedExpr
1684            }
1685            "minute" => {
1686                // date_part('minute', time_index)
1687                let expr = self.date_part_on_time_index("minute")?;
1688                exprs.push(expr);
1689                ScalarFunc::GeneratedExpr
1690            }
1691            "hour" => {
1692                // date_part('hour', time_index)
1693                let expr = self.date_part_on_time_index("hour")?;
1694                exprs.push(expr);
1695                ScalarFunc::GeneratedExpr
1696            }
1697            "month" => {
1698                // date_part('month', time_index)
1699                let expr = self.date_part_on_time_index("month")?;
1700                exprs.push(expr);
1701                ScalarFunc::GeneratedExpr
1702            }
1703            "year" => {
1704                // date_part('year', time_index)
1705                let expr = self.date_part_on_time_index("year")?;
1706                exprs.push(expr);
1707                ScalarFunc::GeneratedExpr
1708            }
1709            "day_of_month" => {
1710                // date_part('day', time_index)
1711                let expr = self.date_part_on_time_index("day")?;
1712                exprs.push(expr);
1713                ScalarFunc::GeneratedExpr
1714            }
1715            "day_of_week" => {
1716                // date_part('dow', time_index)
1717                let expr = self.date_part_on_time_index("dow")?;
1718                exprs.push(expr);
1719                ScalarFunc::GeneratedExpr
1720            }
1721            "day_of_year" => {
1722                // date_part('doy', time_index)
1723                let expr = self.date_part_on_time_index("doy")?;
1724                exprs.push(expr);
1725                ScalarFunc::GeneratedExpr
1726            }
1727            "days_in_month" => {
1728                // date_part(
1729                //     'days',
1730                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1731                // );
1732                let day_lit_expr = "day".lit();
1733                let month_lit_expr = "month".lit();
1734                let interval_1month_lit_expr =
1735                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1736                let interval_1day_lit_expr = DfExpr::Literal(
1737                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1738                    None,
1739                );
1740                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1741                    left: Box::new(interval_1month_lit_expr),
1742                    op: Operator::Minus,
1743                    right: Box::new(interval_1day_lit_expr),
1744                });
1745                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1746                    func: datafusion_functions::datetime::date_trunc(),
1747                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1748                });
1749                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1750                    left: Box::new(date_trunc_expr),
1751                    op: Operator::Plus,
1752                    right: Box::new(the_1month_minus_1day_expr),
1753                });
1754                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1755                    func: datafusion_functions::datetime::date_part(),
1756                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1757                });
1758
1759                exprs.push(date_part_expr);
1760                ScalarFunc::GeneratedExpr
1761            }
1762
1763            "label_join" => {
1764                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1765                    &mut other_input_exprs,
1766                    &self.ctx,
1767                    query_engine_state,
1768                )?;
1769
1770                // Reserve the current field columns except the `dst_label`.
1771                for value in &self.ctx.field_columns {
1772                    if *value != dst_label {
1773                        let expr = DfExpr::Column(Column::from_name(value));
1774                        exprs.push(expr);
1775                    }
1776                }
1777
1778                // Remove it from tag columns if exists to avoid duplicated column names
1779                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1780                new_tags.push(dst_label);
1781                // Add the new label expr to evaluate
1782                exprs.push(concat_expr);
1783
1784                ScalarFunc::GeneratedExpr
1785            }
1786            "label_replace" => {
1787                if let Some((replace_expr, dst_label)) = self
1788                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1789                {
1790                    // Reserve the current field columns except the `dst_label`.
1791                    for value in &self.ctx.field_columns {
1792                        if *value != dst_label {
1793                            let expr = DfExpr::Column(Column::from_name(value));
1794                            exprs.push(expr);
1795                        }
1796                    }
1797
1798                    ensure!(
1799                        !self.ctx.tag_columns.contains(&dst_label),
1800                        SameLabelSetSnafu
1801                    );
1802                    new_tags.push(dst_label);
1803                    // Add the new label expr to evaluate
1804                    exprs.push(replace_expr);
1805                } else {
1806                    // Keep the current field columns
1807                    for value in &self.ctx.field_columns {
1808                        let expr = DfExpr::Column(Column::from_name(value));
1809                        exprs.push(expr);
1810                    }
1811                }
1812
1813                ScalarFunc::GeneratedExpr
1814            }
1815            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1816                // These functions are not expression but a part of plan,
1817                // they are processed by `prom_call_expr_to_plan`.
1818                for value in &self.ctx.field_columns {
1819                    let expr = DfExpr::Column(Column::from_name(value));
1820                    exprs.push(expr);
1821                }
1822
1823                ScalarFunc::GeneratedExpr
1824            }
1825            "round" => {
1826                if other_input_exprs.is_empty() {
1827                    other_input_exprs.push_front(0.0f64.lit());
1828                }
1829                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1830            }
1831            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1832            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1833            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1834            "pi" => {
1835                // pi functions doesn't accepts any arguments, needs special processing
1836                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1837                    func: datafusion::functions::math::pi(),
1838                    args: vec![],
1839                });
1840                exprs.push(fn_expr);
1841
1842                ScalarFunc::GeneratedExpr
1843            }
1844            _ => {
1845                if let Some(f) = query_engine_state
1846                    .session_state()
1847                    .scalar_functions()
1848                    .get(func.name)
1849                {
1850                    ScalarFunc::DataFusionBuiltin(f.clone())
1851                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1852                    let func_state = query_engine_state.function_state();
1853                    let query_ctx = self.table_provider.query_ctx();
1854
1855                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1856                        state: func_state,
1857                        query_ctx: query_ctx.clone(),
1858                    })))
1859                } else if let Some(f) = datafusion_functions::math::functions()
1860                    .iter()
1861                    .find(|f| f.name() == func.name)
1862                {
1863                    ScalarFunc::DataFusionUdf(f.clone())
1864                } else {
1865                    return UnsupportedExprSnafu {
1866                        name: func.name.to_string(),
1867                    }
1868                    .fail();
1869                }
1870            }
1871        };
1872
1873        for value in &self.ctx.field_columns {
1874            let col_expr = DfExpr::Column(Column::from_name(value));
1875
1876            match scalar_func.clone() {
1877                ScalarFunc::DataFusionBuiltin(func) => {
1878                    other_input_exprs.insert(field_column_pos, col_expr);
1879                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1880                        func,
1881                        args: other_input_exprs.clone().into(),
1882                    });
1883                    exprs.push(fn_expr);
1884                    let _ = other_input_exprs.remove(field_column_pos);
1885                }
1886                ScalarFunc::DataFusionUdf(func) => {
1887                    let args = itertools::chain!(
1888                        other_input_exprs.iter().take(field_column_pos).cloned(),
1889                        std::iter::once(col_expr),
1890                        other_input_exprs.iter().skip(field_column_pos).cloned()
1891                    )
1892                    .collect_vec();
1893                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1894                }
1895                ScalarFunc::Udf(func) => {
1896                    let ts_range_expr = DfExpr::Column(Column::from_name(
1897                        RangeManipulate::build_timestamp_range_name(
1898                            self.ctx.time_index_column.as_ref().unwrap(),
1899                        ),
1900                    ));
1901                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1902                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1903                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1904                        func,
1905                        args: other_input_exprs.clone().into(),
1906                    });
1907                    exprs.push(fn_expr);
1908                    let _ = other_input_exprs.remove(field_column_pos + 1);
1909                    let _ = other_input_exprs.remove(field_column_pos);
1910                }
1911                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1912                    let ts_range_expr = DfExpr::Column(Column::from_name(
1913                        RangeManipulate::build_timestamp_range_name(
1914                            self.ctx.time_index_column.as_ref().unwrap(),
1915                        ),
1916                    ));
1917                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1918                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1919                    other_input_exprs
1920                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1921                    other_input_exprs.push_back(lit(range_length));
1922                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1923                        func,
1924                        args: other_input_exprs.clone().into(),
1925                    });
1926                    exprs.push(fn_expr);
1927                    let _ = other_input_exprs.pop_back();
1928                    let _ = other_input_exprs.remove(field_column_pos + 2);
1929                    let _ = other_input_exprs.remove(field_column_pos + 1);
1930                    let _ = other_input_exprs.remove(field_column_pos);
1931                }
1932                ScalarFunc::GeneratedExpr => {}
1933            }
1934        }
1935
1936        // Update value columns' name, and alias them to remove qualifiers
1937        // For label functions such as `label_join`, `label_replace`, etc.,
1938        // we keep the fields unchanged.
1939        if !matches!(func.name, "label_join" | "label_replace") {
1940            let mut new_field_columns = Vec::with_capacity(exprs.len());
1941
1942            exprs = exprs
1943                .into_iter()
1944                .map(|expr| {
1945                    let display_name = expr.schema_name().to_string();
1946                    new_field_columns.push(display_name.clone());
1947                    Ok(expr.alias(display_name))
1948                })
1949                .collect::<std::result::Result<Vec<_>, _>>()
1950                .context(DataFusionPlanningSnafu)?;
1951
1952            self.ctx.field_columns = new_field_columns;
1953        }
1954
1955        Ok((exprs, new_tags))
1956    }
1957
1958    /// Validate label name according to Prometheus specification.
1959    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
1960    /// Additionally, label names starting with double underscores are reserved for internal use.
1961    fn validate_label_name(label_name: &str) -> Result<()> {
1962        // Check if label name starts with double underscores (reserved)
1963        if label_name.starts_with("__") {
1964            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1965        }
1966        // Check if label name matches the required pattern
1967        if !LABEL_NAME_REGEX.is_match(label_name) {
1968            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1969        }
1970
1971        Ok(())
1972    }
1973
1974    /// Build expr for `label_replace` function
1975    fn build_regexp_replace_label_expr(
1976        &self,
1977        other_input_exprs: &mut VecDeque<DfExpr>,
1978        query_engine_state: &QueryEngineState,
1979    ) -> Result<Option<(DfExpr, String)>> {
1980        // label_replace(vector, dst_label, replacement, src_label, regex)
1981        let dst_label = match other_input_exprs.pop_front() {
1982            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1983            other => UnexpectedPlanExprSnafu {
1984                desc: format!("expected dst_label string literal, but found {:?}", other),
1985            }
1986            .fail()?,
1987        };
1988
1989        // Validate the destination label name
1990        Self::validate_label_name(&dst_label)?;
1991        let replacement = match other_input_exprs.pop_front() {
1992            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
1993            other => UnexpectedPlanExprSnafu {
1994                desc: format!("expected replacement string literal, but found {:?}", other),
1995            }
1996            .fail()?,
1997        };
1998        let src_label = match other_input_exprs.pop_front() {
1999            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2000            other => UnexpectedPlanExprSnafu {
2001                desc: format!("expected src_label string literal, but found {:?}", other),
2002            }
2003            .fail()?,
2004        };
2005
2006        let regex = match other_input_exprs.pop_front() {
2007            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2008            other => UnexpectedPlanExprSnafu {
2009                desc: format!("expected regex string literal, but found {:?}", other),
2010            }
2011            .fail()?,
2012        };
2013
2014        // Validate the regex before using it
2015        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2016        regex::Regex::new(&regex).map_err(|_| {
2017            InvalidRegularExpressionSnafu {
2018                regex: regex.clone(),
2019            }
2020            .build()
2021        })?;
2022
2023        // If the src_label exists and regex is empty, keep everything unchanged.
2024        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2025            return Ok(None);
2026        }
2027
2028        // If the src_label doesn't exists, and
2029        if !self.ctx.tag_columns.contains(&src_label) {
2030            if replacement.is_empty() {
2031                // the replacement is empty, keep everything unchanged.
2032                return Ok(None);
2033            } else {
2034                // the replacement is not empty, always adds dst_label with replacement value.
2035                return Ok(Some((
2036                    // alias literal `replacement` as dst_label
2037                    lit(replacement).alias(&dst_label),
2038                    dst_label,
2039                )));
2040            }
2041        }
2042
2043        // Preprocess the regex:
2044        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2045        let regex = format!("^(?s:{regex})$");
2046
2047        let session_state = query_engine_state.session_state();
2048        let func = session_state
2049            .scalar_functions()
2050            .get("regexp_replace")
2051            .context(UnsupportedExprSnafu {
2052                name: "regexp_replace",
2053            })?;
2054
2055        // regexp_replace(src_label, regex, replacement)
2056        let args = vec![
2057            if src_label.is_empty() {
2058                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2059            } else {
2060                DfExpr::Column(Column::from_name(src_label))
2061            },
2062            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2063            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2064        ];
2065
2066        Ok(Some((
2067            DfExpr::ScalarFunction(ScalarFunction {
2068                func: func.clone(),
2069                args,
2070            })
2071            .alias(&dst_label),
2072            dst_label,
2073        )))
2074    }
2075
2076    /// Build expr for `label_join` function
2077    fn build_concat_labels_expr(
2078        other_input_exprs: &mut VecDeque<DfExpr>,
2079        ctx: &PromPlannerContext,
2080        query_engine_state: &QueryEngineState,
2081    ) -> Result<(DfExpr, String)> {
2082        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2083
2084        let dst_label = match other_input_exprs.pop_front() {
2085            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2086            other => UnexpectedPlanExprSnafu {
2087                desc: format!("expected dst_label string literal, but found {:?}", other),
2088            }
2089            .fail()?,
2090        };
2091        let separator = match other_input_exprs.pop_front() {
2092            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2093            other => UnexpectedPlanExprSnafu {
2094                desc: format!("expected separator string literal, but found {:?}", other),
2095            }
2096            .fail()?,
2097        };
2098
2099        // Create a set of available columns (tag columns + field columns + time index column)
2100        let available_columns: HashSet<&str> = ctx
2101            .tag_columns
2102            .iter()
2103            .chain(ctx.field_columns.iter())
2104            .chain(ctx.time_index_column.as_ref())
2105            .map(|s| s.as_str())
2106            .collect();
2107
2108        let src_labels = other_input_exprs
2109            .iter()
2110            .map(|expr| {
2111                // Cast source label into column or null literal
2112                match expr {
2113                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2114                        if label.is_empty() {
2115                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2116                        } else if available_columns.contains(label.as_str()) {
2117                            // Label exists in the table schema
2118                            Ok(DfExpr::Column(Column::from_name(label)))
2119                        } else {
2120                            // Label doesn't exist, treat as empty string (null)
2121                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2122                        }
2123                    }
2124                    other => UnexpectedPlanExprSnafu {
2125                        desc: format!(
2126                            "expected source label string literal, but found {:?}",
2127                            other
2128                        ),
2129                    }
2130                    .fail(),
2131                }
2132            })
2133            .collect::<Result<Vec<_>>>()?;
2134        ensure!(
2135            !src_labels.is_empty(),
2136            FunctionInvalidArgumentSnafu {
2137                fn_name: "label_join"
2138            }
2139        );
2140
2141        let session_state = query_engine_state.session_state();
2142        let func = session_state
2143            .scalar_functions()
2144            .get("concat_ws")
2145            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2146
2147        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2148        let mut args = Vec::with_capacity(1 + src_labels.len());
2149        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2150        args.extend(src_labels);
2151
2152        Ok((
2153            DfExpr::ScalarFunction(ScalarFunction {
2154                func: func.clone(),
2155                args,
2156            })
2157            .alias(&dst_label),
2158            dst_label,
2159        ))
2160    }
2161
2162    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2163        Ok(DfExpr::Column(Column::from_name(
2164            self.ctx
2165                .time_index_column
2166                .clone()
2167                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2168        )))
2169    }
2170
2171    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2172        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2173        for tag in &self.ctx.tag_columns {
2174            let expr = DfExpr::Column(Column::from_name(tag));
2175            result.push(expr);
2176        }
2177        Ok(result)
2178    }
2179
2180    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2181        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2182        for field in &self.ctx.field_columns {
2183            let expr = DfExpr::Column(Column::from_name(field));
2184            result.push(expr);
2185        }
2186        Ok(result)
2187    }
2188
2189    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2190        let mut result = self
2191            .ctx
2192            .tag_columns
2193            .iter()
2194            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2195            .collect::<Vec<_>>();
2196        result.push(self.create_time_index_column_expr()?.sort(true, true));
2197        Ok(result)
2198    }
2199
2200    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2201        self.ctx
2202            .field_columns
2203            .iter()
2204            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2205            .collect::<Vec<_>>()
2206    }
2207
2208    fn create_sort_exprs_by_tags(
2209        func: &str,
2210        tags: Vec<DfExpr>,
2211        asc: bool,
2212    ) -> Result<Vec<SortExpr>> {
2213        ensure!(
2214            !tags.is_empty(),
2215            FunctionInvalidArgumentSnafu { fn_name: func }
2216        );
2217
2218        tags.iter()
2219            .map(|col| match col {
2220                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2221                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2222                }
2223                other => UnexpectedPlanExprSnafu {
2224                    desc: format!("expected label string literal, but found {:?}", other),
2225                }
2226                .fail(),
2227            })
2228            .collect::<Result<Vec<_>>>()
2229    }
2230
2231    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2232        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2233        for value in &self.ctx.field_columns {
2234            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2235            exprs.push(expr);
2236        }
2237
2238        conjunction(exprs).context(ValueNotFoundSnafu {
2239            table: self.table_ref()?.to_quoted_string(),
2240        })
2241    }
2242
2243    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2244    ///
2245    /// # Side Effects
2246    ///
2247    /// This method modifies the value columns in the context by replacing them with the new columns
2248    /// created by the aggregate function application.
2249    ///
2250    /// # Returns
2251    ///
2252    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2253    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2254    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2255    ///   only when the operation is `count_values`, as this operation requires preserving the original
2256    ///   values for grouping.
2257    ///
2258    fn create_aggregate_exprs(
2259        &mut self,
2260        op: TokenType,
2261        param: &Option<Box<PromExpr>>,
2262        input_plan: &LogicalPlan,
2263    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2264        let mut non_col_args = Vec::new();
2265        let aggr = match op.id() {
2266            token::T_SUM => sum_udaf(),
2267            token::T_QUANTILE => {
2268                let q =
2269                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2270                non_col_args.push(q);
2271                quantile_udaf()
2272            }
2273            token::T_AVG => avg_udaf(),
2274            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2275            token::T_MIN => min_udaf(),
2276            token::T_MAX => max_udaf(),
2277            token::T_GROUP => grouping_udaf(),
2278            token::T_STDDEV => stddev_pop_udaf(),
2279            token::T_STDVAR => var_pop_udaf(),
2280            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2281                name: format!("{op:?}"),
2282            }
2283            .fail()?,
2284            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2285        };
2286
2287        // perform aggregate operation to each value column
2288        let exprs: Vec<DfExpr> = self
2289            .ctx
2290            .field_columns
2291            .iter()
2292            .map(|col| {
2293                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2294                let expr = aggr.call(non_col_args.clone());
2295                non_col_args.pop();
2296                expr
2297            })
2298            .collect::<Vec<_>>();
2299
2300        // if the aggregator is `count_values`, it must be grouped by current fields.
2301        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2302            let prev_field_exprs: Vec<_> = self
2303                .ctx
2304                .field_columns
2305                .iter()
2306                .map(|col| DfExpr::Column(Column::from_name(col)))
2307                .collect();
2308
2309            ensure!(
2310                self.ctx.field_columns.len() == 1,
2311                UnsupportedExprSnafu {
2312                    name: "count_values on multi-value input"
2313                }
2314            );
2315
2316            prev_field_exprs
2317        } else {
2318            vec![]
2319        };
2320
2321        // update value column name according to the aggregators,
2322        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2323
2324        let normalized_exprs =
2325            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2326        for expr in normalized_exprs {
2327            new_field_columns.push(expr.schema_name().to_string());
2328        }
2329        self.ctx.field_columns = new_field_columns;
2330
2331        Ok((exprs, prev_field_exprs))
2332    }
2333
2334    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2335        let param = param
2336            .as_deref()
2337            .with_context(|| FunctionInvalidArgumentSnafu {
2338                fn_name: op.to_string(),
2339            })?;
2340        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2341            return FunctionInvalidArgumentSnafu {
2342                fn_name: op.to_string(),
2343            }
2344            .fail();
2345        };
2346
2347        Ok(val)
2348    }
2349
2350    fn get_param_as_literal_expr(
2351        param: &Option<Box<PromExpr>>,
2352        op: Option<TokenType>,
2353        expected_type: Option<ArrowDataType>,
2354    ) -> Result<DfExpr> {
2355        let prom_param = param.as_deref().with_context(|| {
2356            if let Some(op) = op {
2357                FunctionInvalidArgumentSnafu {
2358                    fn_name: op.to_string(),
2359                }
2360            } else {
2361                FunctionInvalidArgumentSnafu {
2362                    fn_name: "unknown".to_string(),
2363                }
2364            }
2365        })?;
2366
2367        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2368            if let Some(op) = op {
2369                FunctionInvalidArgumentSnafu {
2370                    fn_name: op.to_string(),
2371                }
2372            } else {
2373                FunctionInvalidArgumentSnafu {
2374                    fn_name: "unknown".to_string(),
2375                }
2376            }
2377        })?;
2378
2379        // check if the type is expected
2380        if let Some(expected_type) = expected_type {
2381            // literal should not have reference to column
2382            let expr_type = expr
2383                .get_type(&DFSchema::empty())
2384                .context(DataFusionPlanningSnafu)?;
2385            if expected_type != expr_type {
2386                return FunctionInvalidArgumentSnafu {
2387                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2388                }
2389                .fail();
2390            }
2391        }
2392
2393        Ok(expr)
2394    }
2395
2396    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2397    ///
2398    fn create_window_exprs(
2399        &mut self,
2400        op: TokenType,
2401        group_exprs: Vec<DfExpr>,
2402        input_plan: &LogicalPlan,
2403    ) -> Result<Vec<DfExpr>> {
2404        ensure!(
2405            self.ctx.field_columns.len() == 1,
2406            UnsupportedExprSnafu {
2407                name: "topk or bottomk on multi-value input"
2408            }
2409        );
2410
2411        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2412
2413        let asc = matches!(op.id(), token::T_BOTTOMK);
2414
2415        let tag_sort_exprs = self
2416            .create_tag_column_exprs()?
2417            .into_iter()
2418            .map(|expr| expr.sort(asc, true));
2419
2420        // perform window operation to each value column
2421        let exprs: Vec<DfExpr> = self
2422            .ctx
2423            .field_columns
2424            .iter()
2425            .map(|col| {
2426                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2427                // Order by value in the specific order
2428                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2429                // Then tags if the values are equal,
2430                // Try to ensure the relative stability of the output results.
2431                sort_exprs.extend(tag_sort_exprs.clone());
2432
2433                DfExpr::WindowFunction(Box::new(WindowFunction {
2434                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2435                    params: WindowFunctionParams {
2436                        args: vec![],
2437                        partition_by: group_exprs.clone(),
2438                        order_by: sort_exprs,
2439                        window_frame: WindowFrame::new(Some(true)),
2440                        null_treatment: None,
2441                        distinct: false,
2442                    },
2443                }))
2444            })
2445            .collect();
2446
2447        let normalized_exprs =
2448            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2449        Ok(normalized_exprs)
2450    }
2451
2452    /// Try to build a [f64] from [PromExpr].
2453    #[deprecated(
2454        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2455    )]
2456    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2457        match expr {
2458            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2459            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2460            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2461                Self::try_build_float_literal(expr).map(|f| -f)
2462            }
2463            PromExpr::StringLiteral(_)
2464            | PromExpr::Binary(_)
2465            | PromExpr::VectorSelector(_)
2466            | PromExpr::MatrixSelector(_)
2467            | PromExpr::Call(_)
2468            | PromExpr::Extension(_)
2469            | PromExpr::Aggregate(_)
2470            | PromExpr::Subquery(_) => None,
2471        }
2472    }
2473
2474    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2475    async fn create_histogram_plan(
2476        &mut self,
2477        args: &PromFunctionArgs,
2478        query_engine_state: &QueryEngineState,
2479    ) -> Result<LogicalPlan> {
2480        if args.args.len() != 2 {
2481            return FunctionInvalidArgumentSnafu {
2482                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2483            }
2484            .fail();
2485        }
2486        #[allow(deprecated)]
2487        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2488            FunctionInvalidArgumentSnafu {
2489                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2490            }
2491        })?;
2492
2493        let input = args.args[1].as_ref().clone();
2494        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2495
2496        if !self.ctx.has_le_tag() {
2497            // Return empty result instead of error when 'le' column is not found
2498            // This handles the case when histogram metrics don't exist
2499            return Ok(LogicalPlan::EmptyRelation(
2500                datafusion::logical_expr::EmptyRelation {
2501                    produce_one_row: false,
2502                    schema: Arc::new(DFSchema::empty()),
2503                },
2504            ));
2505        }
2506        let time_index_column =
2507            self.ctx
2508                .time_index_column
2509                .clone()
2510                .with_context(|| TimeIndexNotFoundSnafu {
2511                    table: self.ctx.table_name.clone().unwrap_or_default(),
2512                })?;
2513        // FIXME(ruihang): support multi fields
2514        let field_column = self
2515            .ctx
2516            .field_columns
2517            .first()
2518            .with_context(|| FunctionInvalidArgumentSnafu {
2519                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2520            })?
2521            .clone();
2522        // remove le column from tag columns
2523        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2524
2525        Ok(LogicalPlan::Extension(Extension {
2526            node: Arc::new(
2527                HistogramFold::new(
2528                    LE_COLUMN_NAME.to_string(),
2529                    field_column,
2530                    time_index_column,
2531                    phi,
2532                    input_plan,
2533                )
2534                .context(DataFusionPlanningSnafu)?,
2535            ),
2536        }))
2537    }
2538
2539    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2540    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2541        if args.args.len() != 1 {
2542            return FunctionInvalidArgumentSnafu {
2543                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2544            }
2545            .fail();
2546        }
2547        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2548
2549        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2550        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2551        self.ctx.reset_table_name_and_schema();
2552        self.ctx.tag_columns = vec![];
2553        self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2554        Ok(LogicalPlan::Extension(Extension {
2555            node: Arc::new(
2556                EmptyMetric::new(
2557                    self.ctx.start,
2558                    self.ctx.end,
2559                    self.ctx.interval,
2560                    SPECIAL_TIME_FUNCTION.to_string(),
2561                    GREPTIME_VALUE.to_string(),
2562                    Some(lit),
2563                )
2564                .context(DataFusionPlanningSnafu)?,
2565            ),
2566        }))
2567    }
2568
2569    /// Create a [SCALAR_FUNCTION] plan
2570    async fn create_scalar_plan(
2571        &mut self,
2572        args: &PromFunctionArgs,
2573        query_engine_state: &QueryEngineState,
2574    ) -> Result<LogicalPlan> {
2575        ensure!(
2576            args.len() == 1,
2577            FunctionInvalidArgumentSnafu {
2578                fn_name: SCALAR_FUNCTION
2579            }
2580        );
2581        let input = self
2582            .prom_expr_to_plan(&args.args[0], query_engine_state)
2583            .await?;
2584        ensure!(
2585            self.ctx.field_columns.len() == 1,
2586            MultiFieldsNotSupportedSnafu {
2587                operator: SCALAR_FUNCTION
2588            },
2589        );
2590        let scalar_plan = LogicalPlan::Extension(Extension {
2591            node: Arc::new(
2592                ScalarCalculate::new(
2593                    self.ctx.start,
2594                    self.ctx.end,
2595                    self.ctx.interval,
2596                    input,
2597                    self.ctx.time_index_column.as_ref().unwrap(),
2598                    &self.ctx.tag_columns,
2599                    &self.ctx.field_columns[0],
2600                    self.ctx.table_name.as_deref(),
2601                )
2602                .context(PromqlPlanNodeSnafu)?,
2603            ),
2604        });
2605        // scalar plan have no tag columns
2606        self.ctx.tag_columns.clear();
2607        self.ctx.field_columns.clear();
2608        self.ctx
2609            .field_columns
2610            .push(scalar_plan.schema().field(1).name().clone());
2611        Ok(scalar_plan)
2612    }
2613
2614    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
2615    async fn create_absent_plan(
2616        &mut self,
2617        args: &PromFunctionArgs,
2618        query_engine_state: &QueryEngineState,
2619    ) -> Result<LogicalPlan> {
2620        if args.args.len() != 1 {
2621            return FunctionInvalidArgumentSnafu {
2622                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2623            }
2624            .fail();
2625        }
2626        let input = self
2627            .prom_expr_to_plan(&args.args[0], query_engine_state)
2628            .await?;
2629
2630        let time_index_expr = self.create_time_index_column_expr()?;
2631        let first_field_expr =
2632            self.create_field_column_exprs()?
2633                .pop()
2634                .with_context(|| ValueNotFoundSnafu {
2635                    table: self.ctx.table_name.clone().unwrap_or_default(),
2636                })?;
2637        let first_value_expr = first_value(first_field_expr, vec![]);
2638
2639        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2640            .aggregate(
2641                vec![time_index_expr.clone()],
2642                vec![first_value_expr.clone()],
2643            )
2644            .context(DataFusionPlanningSnafu)?
2645            .sort(vec![time_index_expr.sort(true, false)])
2646            .context(DataFusionPlanningSnafu)?
2647            .build()
2648            .context(DataFusionPlanningSnafu)?;
2649
2650        let fake_labels = self
2651            .ctx
2652            .selector_matcher
2653            .iter()
2654            .filter_map(|matcher| match matcher.op {
2655                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2656                _ => None,
2657            })
2658            .collect::<Vec<_>>();
2659
2660        // Create the absent plan
2661        let absent_plan = LogicalPlan::Extension(Extension {
2662            node: Arc::new(
2663                Absent::try_new(
2664                    self.ctx.start,
2665                    self.ctx.end,
2666                    self.ctx.interval,
2667                    self.ctx.time_index_column.as_ref().unwrap().clone(),
2668                    self.ctx.field_columns[0].clone(),
2669                    fake_labels,
2670                    ordered_aggregated_input,
2671                )
2672                .context(DataFusionPlanningSnafu)?,
2673            ),
2674        });
2675
2676        Ok(absent_plan)
2677    }
2678
2679    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2680    /// `None` if the input is not a literal expression.
2681    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2682        match expr {
2683            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2684            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2685            PromExpr::VectorSelector(_)
2686            | PromExpr::MatrixSelector(_)
2687            | PromExpr::Extension(_)
2688            | PromExpr::Aggregate(_)
2689            | PromExpr::Subquery(_) => None,
2690            PromExpr::Call(Call { func, .. }) => {
2691                if func.name == SPECIAL_TIME_FUNCTION {
2692                    // For time() function, don't treat it as a literal
2693                    // Let it be handled as a regular function call
2694                    None
2695                } else {
2696                    None
2697                }
2698            }
2699            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2700            // TODO(ruihang): support Unary operator
2701            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2702            PromExpr::Binary(PromBinaryExpr {
2703                lhs,
2704                rhs,
2705                op,
2706                modifier,
2707            }) => {
2708                let lhs = Self::try_build_literal_expr(lhs)?;
2709                let rhs = Self::try_build_literal_expr(rhs)?;
2710                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2711                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2712                let expr = expr_builder(lhs, rhs).ok()?;
2713
2714                let should_return_bool = if let Some(m) = modifier {
2715                    m.return_bool
2716                } else {
2717                    false
2718                };
2719                if is_comparison_op && should_return_bool {
2720                    Some(DfExpr::Cast(Cast {
2721                        expr: Box::new(expr),
2722                        data_type: ArrowDataType::Float64,
2723                    }))
2724                } else {
2725                    Some(expr)
2726                }
2727            }
2728        }
2729    }
2730
2731    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2732        match expr {
2733            PromExpr::Call(Call { func, .. }) => {
2734                if func.name == SPECIAL_TIME_FUNCTION
2735                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2736                {
2737                    Some(build_special_time_expr(time_index_col))
2738                } else {
2739                    None
2740                }
2741            }
2742            _ => None,
2743        }
2744    }
2745
2746    /// Return a lambda to build binary expression from token.
2747    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2748    #[allow(clippy::type_complexity)]
2749    fn prom_token_to_binary_expr_builder(
2750        token: TokenType,
2751    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2752        match token.id() {
2753            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2754            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2755            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2756            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2757            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2758            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2759            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2760            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2761            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2762            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2763            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2764            token::T_POW => Ok(Box::new(|lhs, rhs| {
2765                Ok(DfExpr::ScalarFunction(ScalarFunction {
2766                    func: datafusion_functions::math::power(),
2767                    args: vec![lhs, rhs],
2768                }))
2769            })),
2770            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2771                Ok(DfExpr::ScalarFunction(ScalarFunction {
2772                    func: datafusion_functions::math::atan2(),
2773                    args: vec![lhs, rhs],
2774                }))
2775            })),
2776            _ => UnexpectedTokenSnafu { token }.fail(),
2777        }
2778    }
2779
2780    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2781    fn is_token_a_comparison_op(token: TokenType) -> bool {
2782        matches!(
2783            token.id(),
2784            token::T_EQLC
2785                | token::T_NEQ
2786                | token::T_GTR
2787                | token::T_LSS
2788                | token::T_GTE
2789                | token::T_LTE
2790        )
2791    }
2792
2793    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2794    fn is_token_a_set_op(token: TokenType) -> bool {
2795        matches!(
2796            token.id(),
2797            token::T_LAND // INTERSECT
2798                | token::T_LOR // UNION
2799                | token::T_LUNLESS // EXCEPT
2800        )
2801    }
2802
2803    /// Build a inner join on time index column and tag columns to concat two logical plans.
2804    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2805    #[allow(clippy::too_many_arguments)]
2806    fn join_on_non_field_columns(
2807        &self,
2808        left: LogicalPlan,
2809        right: LogicalPlan,
2810        left_table_ref: TableReference,
2811        right_table_ref: TableReference,
2812        left_time_index_column: Option<String>,
2813        right_time_index_column: Option<String>,
2814        only_join_time_index: bool,
2815        modifier: &Option<BinModifier>,
2816    ) -> Result<LogicalPlan> {
2817        let mut left_tag_columns = if only_join_time_index {
2818            BTreeSet::new()
2819        } else {
2820            self.ctx
2821                .tag_columns
2822                .iter()
2823                .cloned()
2824                .collect::<BTreeSet<_>>()
2825        };
2826        let mut right_tag_columns = left_tag_columns.clone();
2827
2828        // apply modifier
2829        if let Some(modifier) = modifier {
2830            // apply label modifier
2831            if let Some(matching) = &modifier.matching {
2832                match matching {
2833                    // keeps columns mentioned in `on`
2834                    LabelModifier::Include(on) => {
2835                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2836                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2837                        right_tag_columns =
2838                            right_tag_columns.intersection(&mask).cloned().collect();
2839                    }
2840                    // removes columns memtioned in `ignoring`
2841                    LabelModifier::Exclude(ignoring) => {
2842                        // doesn't check existence of label
2843                        for label in &ignoring.labels {
2844                            let _ = left_tag_columns.remove(label);
2845                            let _ = right_tag_columns.remove(label);
2846                        }
2847                    }
2848                }
2849            }
2850        }
2851
2852        // push time index column if it exists
2853        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2854            (left_time_index_column, right_time_index_column)
2855        {
2856            left_tag_columns.insert(left_time_index_column);
2857            right_tag_columns.insert(right_time_index_column);
2858        }
2859
2860        let right = LogicalPlanBuilder::from(right)
2861            .alias(right_table_ref)
2862            .context(DataFusionPlanningSnafu)?
2863            .build()
2864            .context(DataFusionPlanningSnafu)?;
2865
2866        // Inner Join on time index column to concat two operator
2867        LogicalPlanBuilder::from(left)
2868            .alias(left_table_ref)
2869            .context(DataFusionPlanningSnafu)?
2870            .join_detailed(
2871                right,
2872                JoinType::Inner,
2873                (
2874                    left_tag_columns
2875                        .into_iter()
2876                        .map(Column::from_name)
2877                        .collect::<Vec<_>>(),
2878                    right_tag_columns
2879                        .into_iter()
2880                        .map(Column::from_name)
2881                        .collect::<Vec<_>>(),
2882                ),
2883                None,
2884                NullEquality::NullEqualsNull,
2885            )
2886            .context(DataFusionPlanningSnafu)?
2887            .build()
2888            .context(DataFusionPlanningSnafu)
2889    }
2890
2891    /// Build a set operator (AND/OR/UNLESS)
2892    fn set_op_on_non_field_columns(
2893        &mut self,
2894        left: LogicalPlan,
2895        mut right: LogicalPlan,
2896        left_context: PromPlannerContext,
2897        right_context: PromPlannerContext,
2898        op: TokenType,
2899        modifier: &Option<BinModifier>,
2900    ) -> Result<LogicalPlan> {
2901        let mut left_tag_col_set = left_context
2902            .tag_columns
2903            .iter()
2904            .cloned()
2905            .collect::<HashSet<_>>();
2906        let mut right_tag_col_set = right_context
2907            .tag_columns
2908            .iter()
2909            .cloned()
2910            .collect::<HashSet<_>>();
2911
2912        if matches!(op.id(), token::T_LOR) {
2913            return self.or_operator(
2914                left,
2915                right,
2916                left_tag_col_set,
2917                right_tag_col_set,
2918                left_context,
2919                right_context,
2920                modifier,
2921            );
2922        }
2923
2924        // apply modifier
2925        if let Some(modifier) = modifier {
2926            // one-to-many and many-to-one are not supported
2927            ensure!(
2928                matches!(
2929                    modifier.card,
2930                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2931                ),
2932                UnsupportedVectorMatchSnafu {
2933                    name: modifier.card.clone(),
2934                },
2935            );
2936            // apply label modifier
2937            if let Some(matching) = &modifier.matching {
2938                match matching {
2939                    // keeps columns mentioned in `on`
2940                    LabelModifier::Include(on) => {
2941                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2942                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2943                        right_tag_col_set =
2944                            right_tag_col_set.intersection(&mask).cloned().collect();
2945                    }
2946                    // removes columns memtioned in `ignoring`
2947                    LabelModifier::Exclude(ignoring) => {
2948                        // doesn't check existence of label
2949                        for label in &ignoring.labels {
2950                            let _ = left_tag_col_set.remove(label);
2951                            let _ = right_tag_col_set.remove(label);
2952                        }
2953                    }
2954                }
2955            }
2956        }
2957        // ensure two sides have the same tag columns
2958        if !matches!(op.id(), token::T_LOR) {
2959            ensure!(
2960                left_tag_col_set == right_tag_col_set,
2961                CombineTableColumnMismatchSnafu {
2962                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2963                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2964                }
2965            )
2966        };
2967        let left_time_index = left_context.time_index_column.clone().unwrap();
2968        let right_time_index = right_context.time_index_column.clone().unwrap();
2969        let join_keys = left_tag_col_set
2970            .iter()
2971            .cloned()
2972            .chain([left_time_index.clone()])
2973            .collect::<Vec<_>>();
2974        self.ctx.time_index_column = Some(left_time_index.clone());
2975
2976        // alias right time index column if necessary
2977        if left_context.time_index_column != right_context.time_index_column {
2978            let right_project_exprs = right
2979                .schema()
2980                .fields()
2981                .iter()
2982                .map(|field| {
2983                    if field.name() == &right_time_index {
2984                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2985                    } else {
2986                        DfExpr::Column(Column::from_name(field.name()))
2987                    }
2988                })
2989                .collect::<Vec<_>>();
2990
2991            right = LogicalPlanBuilder::from(right)
2992                .project(right_project_exprs)
2993                .context(DataFusionPlanningSnafu)?
2994                .build()
2995                .context(DataFusionPlanningSnafu)?;
2996        }
2997
2998        ensure!(
2999            left_context.field_columns.len() == 1,
3000            MultiFieldsNotSupportedSnafu {
3001                operator: "AND operator"
3002            }
3003        );
3004        // Update the field column in context.
3005        // The AND/UNLESS operator only keep the field column in left input.
3006        let left_field_col = left_context.field_columns.first().unwrap();
3007        self.ctx.field_columns = vec![left_field_col.clone()];
3008
3009        // Generate join plan.
3010        // All set operations in PromQL are "distinct"
3011        match op.id() {
3012            token::T_LAND => LogicalPlanBuilder::from(left)
3013                .distinct()
3014                .context(DataFusionPlanningSnafu)?
3015                .join_detailed(
3016                    right,
3017                    JoinType::LeftSemi,
3018                    (join_keys.clone(), join_keys),
3019                    None,
3020                    NullEquality::NullEqualsNull,
3021                )
3022                .context(DataFusionPlanningSnafu)?
3023                .build()
3024                .context(DataFusionPlanningSnafu),
3025            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3026                .distinct()
3027                .context(DataFusionPlanningSnafu)?
3028                .join_detailed(
3029                    right,
3030                    JoinType::LeftAnti,
3031                    (join_keys.clone(), join_keys),
3032                    None,
3033                    NullEquality::NullEqualsNull,
3034                )
3035                .context(DataFusionPlanningSnafu)?
3036                .build()
3037                .context(DataFusionPlanningSnafu),
3038            token::T_LOR => {
3039                // OR is handled at the beginning of this function, as it cannot
3040                // be expressed using JOIN like AND and UNLESS.
3041                unreachable!()
3042            }
3043            _ => UnexpectedTokenSnafu { token: op }.fail(),
3044        }
3045    }
3046
3047    // TODO(ruihang): change function name
3048    #[allow(clippy::too_many_arguments)]
3049    fn or_operator(
3050        &mut self,
3051        left: LogicalPlan,
3052        right: LogicalPlan,
3053        left_tag_cols_set: HashSet<String>,
3054        right_tag_cols_set: HashSet<String>,
3055        left_context: PromPlannerContext,
3056        right_context: PromPlannerContext,
3057        modifier: &Option<BinModifier>,
3058    ) -> Result<LogicalPlan> {
3059        // checks
3060        ensure!(
3061            left_context.field_columns.len() == right_context.field_columns.len(),
3062            CombineTableColumnMismatchSnafu {
3063                left: left_context.field_columns.clone(),
3064                right: right_context.field_columns.clone()
3065            }
3066        );
3067        ensure!(
3068            left_context.field_columns.len() == 1,
3069            MultiFieldsNotSupportedSnafu {
3070                operator: "OR operator"
3071            }
3072        );
3073
3074        // prepare hash sets
3075        let all_tags = left_tag_cols_set
3076            .union(&right_tag_cols_set)
3077            .cloned()
3078            .collect::<HashSet<_>>();
3079        let tags_not_in_left = all_tags
3080            .difference(&left_tag_cols_set)
3081            .cloned()
3082            .collect::<Vec<_>>();
3083        let tags_not_in_right = all_tags
3084            .difference(&right_tag_cols_set)
3085            .cloned()
3086            .collect::<Vec<_>>();
3087        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3088        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3089        let left_qualifier_string = left_qualifier
3090            .as_ref()
3091            .map(|l| l.to_string())
3092            .unwrap_or_default();
3093        let right_qualifier_string = right_qualifier
3094            .as_ref()
3095            .map(|r| r.to_string())
3096            .unwrap_or_default();
3097        let left_time_index_column =
3098            left_context
3099                .time_index_column
3100                .clone()
3101                .with_context(|| TimeIndexNotFoundSnafu {
3102                    table: left_qualifier_string.clone(),
3103                })?;
3104        let right_time_index_column =
3105            right_context
3106                .time_index_column
3107                .clone()
3108                .with_context(|| TimeIndexNotFoundSnafu {
3109                    table: right_qualifier_string.clone(),
3110                })?;
3111        // Take the name of first field column. The length is checked above.
3112        let left_field_col = left_context.field_columns.first().unwrap();
3113        let right_field_col = right_context.field_columns.first().unwrap();
3114
3115        // step 0: fill all columns in output schema
3116        let mut all_columns_set = left
3117            .schema()
3118            .fields()
3119            .iter()
3120            .chain(right.schema().fields().iter())
3121            .map(|field| field.name().clone())
3122            .collect::<HashSet<_>>();
3123        // remove time index column
3124        all_columns_set.remove(&left_time_index_column);
3125        all_columns_set.remove(&right_time_index_column);
3126        // remove field column in the right
3127        if left_field_col != right_field_col {
3128            all_columns_set.remove(right_field_col);
3129        }
3130        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3131        // sort to ensure the generated schema is not volatile
3132        all_columns.sort_unstable();
3133        // use left time index column name as the result time index column name
3134        all_columns.insert(0, left_time_index_column.clone());
3135
3136        // step 1: align schema using project, fill non-exist columns with null
3137        let left_proj_exprs = all_columns.iter().map(|col| {
3138            if tags_not_in_left.contains(col) {
3139                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3140            } else {
3141                DfExpr::Column(Column::new(None::<String>, col))
3142            }
3143        });
3144        let right_time_index_expr = DfExpr::Column(Column::new(
3145            right_qualifier.clone(),
3146            right_time_index_column,
3147        ))
3148        .alias(left_time_index_column.clone());
3149        // The field column in right side may not have qualifier (it may be removed by join operation),
3150        // so we need to find it from the schema.
3151        let right_qualifier_for_field = right
3152            .schema()
3153            .iter()
3154            .find(|(_, f)| f.name() == right_field_col)
3155            .map(|(q, _)| q)
3156            .context(ColumnNotFoundSnafu {
3157                col: right_field_col.to_string(),
3158            })?
3159            .cloned();
3160
3161        // `skip(1)` to skip the time index column
3162        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3163            // expr
3164            if col == left_field_col && left_field_col != right_field_col {
3165                // qualify field in right side if necessary to handle different field name
3166                DfExpr::Column(Column::new(
3167                    right_qualifier_for_field.clone(),
3168                    right_field_col,
3169                ))
3170            } else if tags_not_in_right.contains(col) {
3171                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.to_string())
3172            } else {
3173                DfExpr::Column(Column::new(None::<String>, col))
3174            }
3175        });
3176        let right_proj_exprs = [right_time_index_expr]
3177            .into_iter()
3178            .chain(right_proj_exprs_without_time_index);
3179
3180        let left_projected = LogicalPlanBuilder::from(left)
3181            .project(left_proj_exprs)
3182            .context(DataFusionPlanningSnafu)?
3183            .alias(left_qualifier_string.clone())
3184            .context(DataFusionPlanningSnafu)?
3185            .build()
3186            .context(DataFusionPlanningSnafu)?;
3187        let right_projected = LogicalPlanBuilder::from(right)
3188            .project(right_proj_exprs)
3189            .context(DataFusionPlanningSnafu)?
3190            .alias(right_qualifier_string.clone())
3191            .context(DataFusionPlanningSnafu)?
3192            .build()
3193            .context(DataFusionPlanningSnafu)?;
3194
3195        // step 2: compute match columns
3196        let mut match_columns = if let Some(modifier) = modifier
3197            && let Some(matching) = &modifier.matching
3198        {
3199            match matching {
3200                // keeps columns mentioned in `on`
3201                LabelModifier::Include(on) => on.labels.clone(),
3202                // removes columns memtioned in `ignoring`
3203                LabelModifier::Exclude(ignoring) => {
3204                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3205                    all_tags.difference(&ignoring).cloned().collect()
3206                }
3207            }
3208        } else {
3209            all_tags.iter().cloned().collect()
3210        };
3211        // sort to ensure the generated plan is not volatile
3212        match_columns.sort_unstable();
3213        // step 3: build `UnionDistinctOn` plan
3214        let schema = left_projected.schema().clone();
3215        let union_distinct_on = UnionDistinctOn::new(
3216            left_projected,
3217            right_projected,
3218            match_columns,
3219            left_time_index_column.clone(),
3220            schema,
3221        );
3222        let result = LogicalPlan::Extension(Extension {
3223            node: Arc::new(union_distinct_on),
3224        });
3225
3226        // step 4: update context
3227        self.ctx.time_index_column = Some(left_time_index_column);
3228        self.ctx.tag_columns = all_tags.into_iter().collect();
3229        self.ctx.field_columns = vec![left_field_col.to_string()];
3230
3231        Ok(result)
3232    }
3233
3234    /// Build a projection that project and perform operation expr for every value columns.
3235    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3236    ///
3237    /// # Side effect
3238    ///
3239    /// This function will update the value columns in the context. Those new column names
3240    /// don't contains qualifier.
3241    fn projection_for_each_field_column<F>(
3242        &mut self,
3243        input: LogicalPlan,
3244        name_to_expr: F,
3245    ) -> Result<LogicalPlan>
3246    where
3247        F: FnMut(&String) -> Result<DfExpr>,
3248    {
3249        let non_field_columns_iter = self
3250            .ctx
3251            .tag_columns
3252            .iter()
3253            .chain(self.ctx.time_index_column.iter())
3254            .map(|col| {
3255                Ok(DfExpr::Column(Column::new(
3256                    self.ctx.table_name.clone().map(TableReference::bare),
3257                    col,
3258                )))
3259            });
3260
3261        // build computation exprs
3262        let result_field_columns = self
3263            .ctx
3264            .field_columns
3265            .iter()
3266            .map(name_to_expr)
3267            .collect::<Result<Vec<_>>>()?;
3268
3269        // alias the computation exprs to remove qualifier
3270        self.ctx.field_columns = result_field_columns
3271            .iter()
3272            .map(|expr| expr.schema_name().to_string())
3273            .collect();
3274        let field_columns_iter = result_field_columns
3275            .into_iter()
3276            .zip(self.ctx.field_columns.iter())
3277            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3278
3279        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3280        let project_fields = non_field_columns_iter
3281            .chain(field_columns_iter)
3282            .collect::<Result<Vec<_>>>()?;
3283
3284        LogicalPlanBuilder::from(input)
3285            .project(project_fields)
3286            .context(DataFusionPlanningSnafu)?
3287            .build()
3288            .context(DataFusionPlanningSnafu)
3289    }
3290
3291    /// Build a filter plan that filter on value column. Notice that only one value column
3292    /// is expected.
3293    fn filter_on_field_column<F>(
3294        &self,
3295        input: LogicalPlan,
3296        mut name_to_expr: F,
3297    ) -> Result<LogicalPlan>
3298    where
3299        F: FnMut(&String) -> Result<DfExpr>,
3300    {
3301        ensure!(
3302            self.ctx.field_columns.len() == 1,
3303            UnsupportedExprSnafu {
3304                name: "filter on multi-value input"
3305            }
3306        );
3307
3308        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3309
3310        LogicalPlanBuilder::from(input)
3311            .filter(field_column_filter)
3312            .context(DataFusionPlanningSnafu)?
3313            .build()
3314            .context(DataFusionPlanningSnafu)
3315    }
3316
3317    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3318    /// time index column in context is set
3319    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3320        let input_expr = datafusion::logical_expr::col(
3321            self.ctx
3322                .time_index_column
3323                .as_ref()
3324                // table name doesn't matters here
3325                .with_context(|| TimeIndexNotFoundSnafu {
3326                    table: "<doesn't matter>",
3327                })?,
3328        );
3329        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3330            func: datafusion_functions::datetime::date_part(),
3331            args: vec![date_part.lit(), input_expr],
3332        });
3333        Ok(fn_expr)
3334    }
3335}
3336
3337#[derive(Default, Debug)]
3338struct FunctionArgs {
3339    input: Option<PromExpr>,
3340    literals: Vec<DfExpr>,
3341}
3342
3343/// Represents different types of scalar functions supported in PromQL expressions.
3344/// Each variant defines how the function should be processed and what arguments it expects.
3345#[derive(Debug, Clone)]
3346enum ScalarFunc {
3347    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3348    /// These are passed through directly to DataFusion's execution engine.
3349    /// Processing: Simple argument insertion at the specified position.
3350    DataFusionBuiltin(Arc<ScalarUdfDef>),
3351    /// User-defined functions registered in DataFusion's function registry.
3352    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3353    /// Processing: Direct pass-through with argument positioning.
3354    DataFusionUdf(Arc<ScalarUdfDef>),
3355    /// PromQL-specific functions that operate on time series data with temporal context.
3356    /// These functions require both timestamp ranges and values to perform calculations.
3357    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3358    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3359    Udf(Arc<ScalarUdfDef>),
3360    /// PromQL functions requiring extrapolation calculations with explicit range information.
3361    /// These functions need to know the time range length to perform rate calculations.
3362    /// The second field contains the range length in milliseconds.
3363    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
3364    /// Examples: increase, rate, delta
3365    // TODO(ruihang): maybe merge with Udf later
3366    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3367    /// Functions that generate expressions directly without external UDF calls.
3368    /// The expression is constructed during function matching and requires no additional processing.
3369    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
3370    GeneratedExpr,
3371}
3372
3373#[cfg(test)]
3374mod test {
3375    use std::time::{Duration, UNIX_EPOCH};
3376
3377    use catalog::RegisterTableRequest;
3378    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3379    use common_base::Plugins;
3380    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3381    use common_query::test_util::DummyDecoder;
3382    use datatypes::prelude::ConcreteDataType;
3383    use datatypes::schema::{ColumnSchema, Schema};
3384    use promql_parser::label::Labels;
3385    use promql_parser::parser;
3386    use session::context::QueryContext;
3387    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3388    use table::test_util::EmptyTable;
3389
3390    use super::*;
3391    use crate::options::QueryOptions;
3392
3393    fn build_query_engine_state() -> QueryEngineState {
3394        QueryEngineState::new(
3395            new_memory_catalog_manager().unwrap(),
3396            None,
3397            None,
3398            None,
3399            None,
3400            None,
3401            false,
3402            Plugins::default(),
3403            QueryOptions::default(),
3404        )
3405    }
3406
3407    async fn build_test_table_provider(
3408        table_name_tuples: &[(String, String)],
3409        num_tag: usize,
3410        num_field: usize,
3411    ) -> DfTableSourceProvider {
3412        let catalog_list = MemoryCatalogManager::with_default_setup();
3413        for (schema_name, table_name) in table_name_tuples {
3414            let mut columns = vec![];
3415            for i in 0..num_tag {
3416                columns.push(ColumnSchema::new(
3417                    format!("tag_{i}"),
3418                    ConcreteDataType::string_datatype(),
3419                    false,
3420                ));
3421            }
3422            columns.push(
3423                ColumnSchema::new(
3424                    "timestamp".to_string(),
3425                    ConcreteDataType::timestamp_millisecond_datatype(),
3426                    false,
3427                )
3428                .with_time_index(true),
3429            );
3430            for i in 0..num_field {
3431                columns.push(ColumnSchema::new(
3432                    format!("field_{i}"),
3433                    ConcreteDataType::float64_datatype(),
3434                    true,
3435                ));
3436            }
3437            let schema = Arc::new(Schema::new(columns));
3438            let table_meta = TableMetaBuilder::empty()
3439                .schema(schema)
3440                .primary_key_indices((0..num_tag).collect())
3441                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3442                .next_column_id(1024)
3443                .build()
3444                .unwrap();
3445            let table_info = TableInfoBuilder::default()
3446                .name(table_name.to_string())
3447                .meta(table_meta)
3448                .build()
3449                .unwrap();
3450            let table = EmptyTable::from_table_info(&table_info);
3451
3452            assert!(
3453                catalog_list
3454                    .register_table_sync(RegisterTableRequest {
3455                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3456                        schema: schema_name.to_string(),
3457                        table_name: table_name.to_string(),
3458                        table_id: 1024,
3459                        table,
3460                    })
3461                    .is_ok()
3462            );
3463        }
3464
3465        DfTableSourceProvider::new(
3466            catalog_list,
3467            false,
3468            QueryContext::arc(),
3469            DummyDecoder::arc(),
3470            false,
3471        )
3472    }
3473
3474    async fn build_test_table_provider_with_fields(
3475        table_name_tuples: &[(String, String)],
3476        tags: &[&str],
3477    ) -> DfTableSourceProvider {
3478        let catalog_list = MemoryCatalogManager::with_default_setup();
3479        for (schema_name, table_name) in table_name_tuples {
3480            let mut columns = vec![];
3481            let num_tag = tags.len();
3482            for tag in tags {
3483                columns.push(ColumnSchema::new(
3484                    tag.to_string(),
3485                    ConcreteDataType::string_datatype(),
3486                    false,
3487                ));
3488            }
3489            columns.push(
3490                ColumnSchema::new(
3491                    "greptime_timestamp".to_string(),
3492                    ConcreteDataType::timestamp_millisecond_datatype(),
3493                    false,
3494                )
3495                .with_time_index(true),
3496            );
3497            columns.push(ColumnSchema::new(
3498                "greptime_value".to_string(),
3499                ConcreteDataType::float64_datatype(),
3500                true,
3501            ));
3502            let schema = Arc::new(Schema::new(columns));
3503            let table_meta = TableMetaBuilder::empty()
3504                .schema(schema)
3505                .primary_key_indices((0..num_tag).collect())
3506                .next_column_id(1024)
3507                .build()
3508                .unwrap();
3509            let table_info = TableInfoBuilder::default()
3510                .name(table_name.to_string())
3511                .meta(table_meta)
3512                .build()
3513                .unwrap();
3514            let table = EmptyTable::from_table_info(&table_info);
3515
3516            assert!(
3517                catalog_list
3518                    .register_table_sync(RegisterTableRequest {
3519                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3520                        schema: schema_name.to_string(),
3521                        table_name: table_name.to_string(),
3522                        table_id: 1024,
3523                        table,
3524                    })
3525                    .is_ok()
3526            );
3527        }
3528
3529        DfTableSourceProvider::new(
3530            catalog_list,
3531            false,
3532            QueryContext::arc(),
3533            DummyDecoder::arc(),
3534            false,
3535        )
3536    }
3537
3538    // {
3539    //     input: `abs(some_metric{foo!="bar"})`,
3540    //     expected: &Call{
3541    //         Func: MustGetFunction("abs"),
3542    //         Args: Expressions{
3543    //             &VectorSelector{
3544    //                 Name: "some_metric",
3545    //                 LabelMatchers: []*labels.Matcher{
3546    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3547    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3548    //                 },
3549    //             },
3550    //         },
3551    //     },
3552    // },
3553    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3554        let prom_expr =
3555            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3556        let eval_stmt = EvalStmt {
3557            expr: prom_expr,
3558            start: UNIX_EPOCH,
3559            end: UNIX_EPOCH
3560                .checked_add(Duration::from_secs(100_000))
3561                .unwrap(),
3562            interval: Duration::from_secs(5),
3563            lookback_delta: Duration::from_secs(1),
3564        };
3565
3566        let table_provider = build_test_table_provider(
3567            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3568            1,
3569            1,
3570        )
3571        .await;
3572        let plan =
3573            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3574                .await
3575                .unwrap();
3576
3577        let expected = String::from(
3578            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3579            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3580            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3581            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3582            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3583            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3584            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3585        ).replace("TEMPLATE", plan_name);
3586
3587        assert_eq!(plan.display_indent_schema().to_string(), expected);
3588    }
3589
3590    #[tokio::test]
3591    async fn single_abs() {
3592        do_single_instant_function_call("abs", "abs").await;
3593    }
3594
3595    #[tokio::test]
3596    #[should_panic]
3597    async fn single_absent() {
3598        do_single_instant_function_call("absent", "").await;
3599    }
3600
3601    #[tokio::test]
3602    async fn single_ceil() {
3603        do_single_instant_function_call("ceil", "ceil").await;
3604    }
3605
3606    #[tokio::test]
3607    async fn single_exp() {
3608        do_single_instant_function_call("exp", "exp").await;
3609    }
3610
3611    #[tokio::test]
3612    async fn single_ln() {
3613        do_single_instant_function_call("ln", "ln").await;
3614    }
3615
3616    #[tokio::test]
3617    async fn single_log2() {
3618        do_single_instant_function_call("log2", "log2").await;
3619    }
3620
3621    #[tokio::test]
3622    async fn single_log10() {
3623        do_single_instant_function_call("log10", "log10").await;
3624    }
3625
3626    #[tokio::test]
3627    #[should_panic]
3628    async fn single_scalar() {
3629        do_single_instant_function_call("scalar", "").await;
3630    }
3631
3632    #[tokio::test]
3633    #[should_panic]
3634    async fn single_sgn() {
3635        do_single_instant_function_call("sgn", "").await;
3636    }
3637
3638    #[tokio::test]
3639    #[should_panic]
3640    async fn single_sort() {
3641        do_single_instant_function_call("sort", "").await;
3642    }
3643
3644    #[tokio::test]
3645    #[should_panic]
3646    async fn single_sort_desc() {
3647        do_single_instant_function_call("sort_desc", "").await;
3648    }
3649
3650    #[tokio::test]
3651    async fn single_sqrt() {
3652        do_single_instant_function_call("sqrt", "sqrt").await;
3653    }
3654
3655    #[tokio::test]
3656    #[should_panic]
3657    async fn single_timestamp() {
3658        do_single_instant_function_call("timestamp", "").await;
3659    }
3660
3661    #[tokio::test]
3662    async fn single_acos() {
3663        do_single_instant_function_call("acos", "acos").await;
3664    }
3665
3666    #[tokio::test]
3667    #[should_panic]
3668    async fn single_acosh() {
3669        do_single_instant_function_call("acosh", "").await;
3670    }
3671
3672    #[tokio::test]
3673    async fn single_asin() {
3674        do_single_instant_function_call("asin", "asin").await;
3675    }
3676
3677    #[tokio::test]
3678    #[should_panic]
3679    async fn single_asinh() {
3680        do_single_instant_function_call("asinh", "").await;
3681    }
3682
3683    #[tokio::test]
3684    async fn single_atan() {
3685        do_single_instant_function_call("atan", "atan").await;
3686    }
3687
3688    #[tokio::test]
3689    #[should_panic]
3690    async fn single_atanh() {
3691        do_single_instant_function_call("atanh", "").await;
3692    }
3693
3694    #[tokio::test]
3695    async fn single_cos() {
3696        do_single_instant_function_call("cos", "cos").await;
3697    }
3698
3699    #[tokio::test]
3700    #[should_panic]
3701    async fn single_cosh() {
3702        do_single_instant_function_call("cosh", "").await;
3703    }
3704
3705    #[tokio::test]
3706    async fn single_sin() {
3707        do_single_instant_function_call("sin", "sin").await;
3708    }
3709
3710    #[tokio::test]
3711    #[should_panic]
3712    async fn single_sinh() {
3713        do_single_instant_function_call("sinh", "").await;
3714    }
3715
3716    #[tokio::test]
3717    async fn single_tan() {
3718        do_single_instant_function_call("tan", "tan").await;
3719    }
3720
3721    #[tokio::test]
3722    #[should_panic]
3723    async fn single_tanh() {
3724        do_single_instant_function_call("tanh", "").await;
3725    }
3726
3727    #[tokio::test]
3728    #[should_panic]
3729    async fn single_deg() {
3730        do_single_instant_function_call("deg", "").await;
3731    }
3732
3733    #[tokio::test]
3734    #[should_panic]
3735    async fn single_rad() {
3736        do_single_instant_function_call("rad", "").await;
3737    }
3738
3739    // {
3740    //     input: "avg by (foo)(some_metric)",
3741    //     expected: &AggregateExpr{
3742    //         Op: AVG,
3743    //         Expr: &VectorSelector{
3744    //             Name: "some_metric",
3745    //             LabelMatchers: []*labels.Matcher{
3746    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3747    //             },
3748    //             PosRange: PositionRange{
3749    //                 Start: 13,
3750    //                 End:   24,
3751    //             },
3752    //         },
3753    //         Grouping: []string{"foo"},
3754    //         PosRange: PositionRange{
3755    //             Start: 0,
3756    //             End:   25,
3757    //         },
3758    //     },
3759    // },
3760    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3761        let prom_expr = parser::parse(&format!(
3762            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3763        ))
3764        .unwrap();
3765        let mut eval_stmt = EvalStmt {
3766            expr: prom_expr,
3767            start: UNIX_EPOCH,
3768            end: UNIX_EPOCH
3769                .checked_add(Duration::from_secs(100_000))
3770                .unwrap(),
3771            interval: Duration::from_secs(5),
3772            lookback_delta: Duration::from_secs(1),
3773        };
3774
3775        // test group by
3776        let table_provider = build_test_table_provider(
3777            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3778            2,
3779            2,
3780        )
3781        .await;
3782        let plan =
3783            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3784                .await
3785                .unwrap();
3786        let expected_no_without = String::from(
3787            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3788            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3789            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3790            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3791            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3792            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3793            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3794        ).replace("TEMPLATE", plan_name);
3795        assert_eq!(
3796            plan.display_indent_schema().to_string(),
3797            expected_no_without
3798        );
3799
3800        // test group without
3801        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3802            *modifier = Some(LabelModifier::Exclude(Labels {
3803                labels: vec![String::from("tag_1")].into_iter().collect(),
3804            }));
3805        }
3806        let table_provider = build_test_table_provider(
3807            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3808            2,
3809            2,
3810        )
3811        .await;
3812        let plan =
3813            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3814                .await
3815                .unwrap();
3816        let expected_without = String::from(
3817            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3818            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3819            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3820            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3821            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3822            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3823            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3824        ).replace("TEMPLATE", plan_name);
3825        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3826    }
3827
3828    #[tokio::test]
3829    async fn aggregate_sum() {
3830        do_aggregate_expr_plan("sum", "sum").await;
3831    }
3832
3833    #[tokio::test]
3834    async fn aggregate_avg() {
3835        do_aggregate_expr_plan("avg", "avg").await;
3836    }
3837
3838    #[tokio::test]
3839    #[should_panic] // output type doesn't match
3840    async fn aggregate_count() {
3841        do_aggregate_expr_plan("count", "count").await;
3842    }
3843
3844    #[tokio::test]
3845    async fn aggregate_min() {
3846        do_aggregate_expr_plan("min", "min").await;
3847    }
3848
3849    #[tokio::test]
3850    async fn aggregate_max() {
3851        do_aggregate_expr_plan("max", "max").await;
3852    }
3853
3854    #[tokio::test]
3855    #[should_panic] // output type doesn't match
3856    async fn aggregate_group() {
3857        do_aggregate_expr_plan("grouping", "GROUPING").await;
3858    }
3859
3860    #[tokio::test]
3861    async fn aggregate_stddev() {
3862        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3863    }
3864
3865    #[tokio::test]
3866    async fn aggregate_stdvar() {
3867        do_aggregate_expr_plan("stdvar", "var_pop").await;
3868    }
3869
3870    // TODO(ruihang): add range fn tests once exprs are ready.
3871
3872    // {
3873    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3874    //     expected: &BinaryExpr{
3875    //         Op: ADD,
3876    //         LHS: &VectorSelector{
3877    //             Name: "a",
3878    //             LabelMatchers: []*labels.Matcher{
3879    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3880    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3881    //             },
3882    //         },
3883    //         RHS: &VectorSelector{
3884    //             Name: "sum",
3885    //             LabelMatchers: []*labels.Matcher{
3886    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3887    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3888    //             },
3889    //         },
3890    //         VectorMatching: &VectorMatching{},
3891    //     },
3892    // },
3893    #[tokio::test]
3894    async fn binary_op_column_column() {
3895        let prom_expr =
3896            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3897        let eval_stmt = EvalStmt {
3898            expr: prom_expr,
3899            start: UNIX_EPOCH,
3900            end: UNIX_EPOCH
3901                .checked_add(Duration::from_secs(100_000))
3902                .unwrap(),
3903            interval: Duration::from_secs(5),
3904            lookback_delta: Duration::from_secs(1),
3905        };
3906
3907        let table_provider = build_test_table_provider(
3908            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3909            1,
3910            1,
3911        )
3912        .await;
3913        let plan =
3914            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3915                .await
3916                .unwrap();
3917
3918        let expected = String::from(
3919            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3920            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3921            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3922            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3923            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3924            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3925            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3926            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3927            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3928            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3929            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3930            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3931            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3932            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3933        );
3934
3935        assert_eq!(plan.display_indent_schema().to_string(), expected);
3936    }
3937
3938    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3939        let prom_expr = parser::parse(query).unwrap();
3940        let eval_stmt = EvalStmt {
3941            expr: prom_expr,
3942            start: UNIX_EPOCH,
3943            end: UNIX_EPOCH
3944                .checked_add(Duration::from_secs(100_000))
3945                .unwrap(),
3946            interval: Duration::from_secs(5),
3947            lookback_delta: Duration::from_secs(1),
3948        };
3949
3950        let table_provider = build_test_table_provider(
3951            &[
3952                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3953                (
3954                    "greptime_private".to_string(),
3955                    "some_alt_metric".to_string(),
3956                ),
3957            ],
3958            1,
3959            1,
3960        )
3961        .await;
3962        let plan =
3963            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3964                .await
3965                .unwrap();
3966
3967        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
3968    }
3969
3970    #[tokio::test]
3971    async fn binary_op_literal_column() {
3972        let query = r#"1 + some_metric{tag_0="bar"}"#;
3973        let expected = String::from(
3974            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3975            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3976            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3979            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3980        );
3981
3982        indie_query_plan_compare(query, expected).await;
3983    }
3984
3985    #[tokio::test]
3986    async fn binary_op_literal_literal() {
3987        let query = r#"1 + 1"#;
3988        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
3989  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
3990        indie_query_plan_compare(query, expected).await;
3991    }
3992
3993    #[tokio::test]
3994    async fn simple_bool_grammar() {
3995        let query = "some_metric != bool 1.2345";
3996        let expected = String::from(
3997            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3998            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3999            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4000            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4001            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4002            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4003        );
4004
4005        indie_query_plan_compare(query, expected).await;
4006    }
4007
4008    #[tokio::test]
4009    async fn bool_with_additional_arithmetic() {
4010        let query = "some_metric + (1 == bool 2)";
4011        let expected = String::from(
4012            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4013            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4014            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4015            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4016            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4017            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4018        );
4019
4020        indie_query_plan_compare(query, expected).await;
4021    }
4022
4023    #[tokio::test]
4024    async fn simple_unary() {
4025        let query = "-some_metric";
4026        let expected = String::from(
4027            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4028            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4029            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4030            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4031            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4033        );
4034
4035        indie_query_plan_compare(query, expected).await;
4036    }
4037
4038    #[tokio::test]
4039    async fn increase_aggr() {
4040        let query = "increase(some_metric[5m])";
4041        let expected = String::from(
4042            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4043            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4044            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4045            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4046            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4047            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4048            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4049            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4050        );
4051
4052        indie_query_plan_compare(query, expected).await;
4053    }
4054
4055    #[tokio::test]
4056    async fn less_filter_on_value() {
4057        let query = "some_metric < 1.2345";
4058        let expected = String::from(
4059            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4063            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4064            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4065        );
4066
4067        indie_query_plan_compare(query, expected).await;
4068    }
4069
4070    #[tokio::test]
4071    async fn count_over_time() {
4072        let query = "count_over_time(some_metric[5m])";
4073        let expected = String::from(
4074            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4075            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4076            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4077            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4078            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4079            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4080            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4081            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4082        );
4083
4084        indie_query_plan_compare(query, expected).await;
4085    }
4086
4087    #[tokio::test]
4088    async fn test_hash_join() {
4089        let mut eval_stmt = EvalStmt {
4090            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4091            start: UNIX_EPOCH,
4092            end: UNIX_EPOCH
4093                .checked_add(Duration::from_secs(100_000))
4094                .unwrap(),
4095            interval: Duration::from_secs(5),
4096            lookback_delta: Duration::from_secs(1),
4097        };
4098
4099        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4100
4101        let prom_expr = parser::parse(case).unwrap();
4102        eval_stmt.expr = prom_expr;
4103        let table_provider = build_test_table_provider_with_fields(
4104            &[
4105                (
4106                    DEFAULT_SCHEMA_NAME.to_string(),
4107                    "http_server_requests_seconds_sum".to_string(),
4108                ),
4109                (
4110                    DEFAULT_SCHEMA_NAME.to_string(),
4111                    "http_server_requests_seconds_count".to_string(),
4112                ),
4113            ],
4114            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4115        )
4116        .await;
4117        // Should be ok
4118        let plan =
4119            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4120                .await
4121                .unwrap();
4122        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4123            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4124            \n    SubqueryAlias: http_server_requests_seconds_sum\
4125            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4126            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4127            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4128            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4129            \n              TableScan: http_server_requests_seconds_sum\
4130            \n    SubqueryAlias: http_server_requests_seconds_count\
4131            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4132            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4133            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4134            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4135            \n              TableScan: http_server_requests_seconds_count";
4136        assert_eq!(plan.to_string(), expected);
4137    }
4138
4139    #[tokio::test]
4140    async fn test_nested_histogram_quantile() {
4141        let mut eval_stmt = EvalStmt {
4142            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4143            start: UNIX_EPOCH,
4144            end: UNIX_EPOCH
4145                .checked_add(Duration::from_secs(100_000))
4146                .unwrap(),
4147            interval: Duration::from_secs(5),
4148            lookback_delta: Duration::from_secs(1),
4149        };
4150
4151        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4152
4153        let prom_expr = parser::parse(case).unwrap();
4154        eval_stmt.expr = prom_expr;
4155        let table_provider = build_test_table_provider_with_fields(
4156            &[(
4157                DEFAULT_SCHEMA_NAME.to_string(),
4158                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4159            )],
4160            &["pod", "le", "path", "code", "container"],
4161        )
4162        .await;
4163        // Should be ok
4164        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4165            .await
4166            .unwrap();
4167    }
4168
4169    #[tokio::test]
4170    async fn test_parse_and_operator() {
4171        let mut eval_stmt = EvalStmt {
4172            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4173            start: UNIX_EPOCH,
4174            end: UNIX_EPOCH
4175                .checked_add(Duration::from_secs(100_000))
4176                .unwrap(),
4177            interval: Duration::from_secs(5),
4178            lookback_delta: Duration::from_secs(1),
4179        };
4180
4181        let cases = [
4182            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4183            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4184        ];
4185
4186        for case in cases {
4187            let prom_expr = parser::parse(case).unwrap();
4188            eval_stmt.expr = prom_expr;
4189            let table_provider = build_test_table_provider_with_fields(
4190                &[
4191                    (
4192                        DEFAULT_SCHEMA_NAME.to_string(),
4193                        "kubelet_volume_stats_used_bytes".to_string(),
4194                    ),
4195                    (
4196                        DEFAULT_SCHEMA_NAME.to_string(),
4197                        "kubelet_volume_stats_capacity_bytes".to_string(),
4198                    ),
4199                ],
4200                &["namespace", "persistentvolumeclaim"],
4201            )
4202            .await;
4203            // Should be ok
4204            let _ =
4205                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4206                    .await
4207                    .unwrap();
4208        }
4209    }
4210
4211    #[tokio::test]
4212    async fn test_nested_binary_op() {
4213        let mut eval_stmt = EvalStmt {
4214            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4215            start: UNIX_EPOCH,
4216            end: UNIX_EPOCH
4217                .checked_add(Duration::from_secs(100_000))
4218                .unwrap(),
4219            interval: Duration::from_secs(5),
4220            lookback_delta: Duration::from_secs(1),
4221        };
4222
4223        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4224        (
4225            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4226            or
4227            vector(0)
4228        )"#;
4229
4230        let prom_expr = parser::parse(case).unwrap();
4231        eval_stmt.expr = prom_expr;
4232        let table_provider = build_test_table_provider_with_fields(
4233            &[(
4234                DEFAULT_SCHEMA_NAME.to_string(),
4235                "nginx_ingress_controller_requests".to_string(),
4236            )],
4237            &["namespace", "job"],
4238        )
4239        .await;
4240        // Should be ok
4241        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4242            .await
4243            .unwrap();
4244    }
4245
4246    #[tokio::test]
4247    async fn test_parse_or_operator() {
4248        let mut eval_stmt = EvalStmt {
4249            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4250            start: UNIX_EPOCH,
4251            end: UNIX_EPOCH
4252                .checked_add(Duration::from_secs(100_000))
4253                .unwrap(),
4254            interval: Duration::from_secs(5),
4255            lookback_delta: Duration::from_secs(1),
4256        };
4257
4258        let case = r#"
4259        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4260        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4261            or
4262        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4263        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4264
4265        let table_provider = build_test_table_provider_with_fields(
4266            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4267            &["tenant_name", "cluster_name"],
4268        )
4269        .await;
4270        eval_stmt.expr = parser::parse(case).unwrap();
4271        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4272            .await
4273            .unwrap();
4274
4275        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4276            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4277            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4278            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4279            or
4280            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4281            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4282            or
4283            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4284            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4285        let table_provider = build_test_table_provider_with_fields(
4286            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4287            &["tenant_name", "cluster_name"],
4288        )
4289        .await;
4290        eval_stmt.expr = parser::parse(case).unwrap();
4291        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4292            .await
4293            .unwrap();
4294
4295        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4296            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4297            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4298            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4299        let table_provider = build_test_table_provider_with_fields(
4300            &[
4301                (
4302                    DEFAULT_SCHEMA_NAME.to_string(),
4303                    "background_waitevent_cnt".to_string(),
4304                ),
4305                (
4306                    DEFAULT_SCHEMA_NAME.to_string(),
4307                    "foreground_waitevent_cnt".to_string(),
4308                ),
4309            ],
4310            &["tenant_name", "cluster_name"],
4311        )
4312        .await;
4313        eval_stmt.expr = parser::parse(case).unwrap();
4314        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4315            .await
4316            .unwrap();
4317
4318        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4319        let table_provider = build_test_table_provider_with_fields(
4320            &[
4321                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4322                (
4323                    DEFAULT_SCHEMA_NAME.to_string(),
4324                    "container_cpu_load_average_10s".to_string(),
4325                ),
4326                (
4327                    DEFAULT_SCHEMA_NAME.to_string(),
4328                    "container_spec_cpu_quota".to_string(),
4329                ),
4330            ],
4331            &["cluster_name", "host_name"],
4332        )
4333        .await;
4334        eval_stmt.expr = parser::parse(case).unwrap();
4335        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4336            .await
4337            .unwrap();
4338    }
4339
4340    #[tokio::test]
4341    async fn value_matcher() {
4342        // template
4343        let mut eval_stmt = EvalStmt {
4344            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4345            start: UNIX_EPOCH,
4346            end: UNIX_EPOCH
4347                .checked_add(Duration::from_secs(100_000))
4348                .unwrap(),
4349            interval: Duration::from_secs(5),
4350            lookback_delta: Duration::from_secs(1),
4351        };
4352
4353        let cases = [
4354            // single equal matcher
4355            (
4356                r#"some_metric{__field__="field_1"}"#,
4357                vec![
4358                    "some_metric.field_1",
4359                    "some_metric.tag_0",
4360                    "some_metric.tag_1",
4361                    "some_metric.tag_2",
4362                    "some_metric.timestamp",
4363                ],
4364            ),
4365            // two equal matchers
4366            (
4367                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4368                vec![
4369                    "some_metric.field_0",
4370                    "some_metric.field_1",
4371                    "some_metric.tag_0",
4372                    "some_metric.tag_1",
4373                    "some_metric.tag_2",
4374                    "some_metric.timestamp",
4375                ],
4376            ),
4377            // single not_eq matcher
4378            (
4379                r#"some_metric{__field__!="field_1"}"#,
4380                vec![
4381                    "some_metric.field_0",
4382                    "some_metric.field_2",
4383                    "some_metric.tag_0",
4384                    "some_metric.tag_1",
4385                    "some_metric.tag_2",
4386                    "some_metric.timestamp",
4387                ],
4388            ),
4389            // two not_eq matchers
4390            (
4391                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4392                vec![
4393                    "some_metric.field_0",
4394                    "some_metric.tag_0",
4395                    "some_metric.tag_1",
4396                    "some_metric.tag_2",
4397                    "some_metric.timestamp",
4398                ],
4399            ),
4400            // equal and not_eq matchers (no conflict)
4401            (
4402                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4403                vec![
4404                    "some_metric.field_1",
4405                    "some_metric.tag_0",
4406                    "some_metric.tag_1",
4407                    "some_metric.tag_2",
4408                    "some_metric.timestamp",
4409                ],
4410            ),
4411            // equal and not_eq matchers (conflict)
4412            (
4413                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4414                vec![
4415                    "some_metric.tag_0",
4416                    "some_metric.tag_1",
4417                    "some_metric.tag_2",
4418                    "some_metric.timestamp",
4419                ],
4420            ),
4421            // single regex eq matcher
4422            (
4423                r#"some_metric{__field__=~"field_1|field_2"}"#,
4424                vec![
4425                    "some_metric.field_1",
4426                    "some_metric.field_2",
4427                    "some_metric.tag_0",
4428                    "some_metric.tag_1",
4429                    "some_metric.tag_2",
4430                    "some_metric.timestamp",
4431                ],
4432            ),
4433            // single regex not_eq matcher
4434            (
4435                r#"some_metric{__field__!~"field_1|field_2"}"#,
4436                vec![
4437                    "some_metric.field_0",
4438                    "some_metric.tag_0",
4439                    "some_metric.tag_1",
4440                    "some_metric.tag_2",
4441                    "some_metric.timestamp",
4442                ],
4443            ),
4444        ];
4445
4446        for case in cases {
4447            let prom_expr = parser::parse(case.0).unwrap();
4448            eval_stmt.expr = prom_expr;
4449            let table_provider = build_test_table_provider(
4450                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4451                3,
4452                3,
4453            )
4454            .await;
4455            let plan =
4456                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4457                    .await
4458                    .unwrap();
4459            let mut fields = plan.schema().field_names();
4460            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4461            fields.sort();
4462            expected.sort();
4463            assert_eq!(fields, expected, "case: {:?}", case.0);
4464        }
4465
4466        let bad_cases = [
4467            r#"some_metric{__field__="nonexistent"}"#,
4468            r#"some_metric{__field__!="nonexistent"}"#,
4469        ];
4470
4471        for case in bad_cases {
4472            let prom_expr = parser::parse(case).unwrap();
4473            eval_stmt.expr = prom_expr;
4474            let table_provider = build_test_table_provider(
4475                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4476                3,
4477                3,
4478            )
4479            .await;
4480            let plan =
4481                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4482                    .await;
4483            assert!(plan.is_err(), "case: {:?}", case);
4484        }
4485    }
4486
4487    #[tokio::test]
4488    async fn custom_schema() {
4489        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4490        let expected = String::from(
4491            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4492            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4493            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4494            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4495            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4496        );
4497
4498        indie_query_plan_compare(query, expected).await;
4499
4500        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4501        let expected = String::from(
4502            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4503            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4504            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4505            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4506            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4507        );
4508
4509        indie_query_plan_compare(query, expected).await;
4510
4511        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4512        let expected = String::from(
4513            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4514        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4515        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4516        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4517        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4518        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4519        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4520        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4521        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4522        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4523        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4524        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4525        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4526        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4527        );
4528
4529        indie_query_plan_compare(query, expected).await;
4530    }
4531
4532    #[tokio::test]
4533    async fn only_equals_is_supported_for_special_matcher() {
4534        let queries = &[
4535            "some_alt_metric{__schema__!=\"greptime_private\"}",
4536            "some_alt_metric{__schema__=~\"lalala\"}",
4537            "some_alt_metric{__database__!=\"greptime_private\"}",
4538            "some_alt_metric{__database__=~\"lalala\"}",
4539        ];
4540
4541        for query in queries {
4542            let prom_expr = parser::parse(query).unwrap();
4543            let eval_stmt = EvalStmt {
4544                expr: prom_expr,
4545                start: UNIX_EPOCH,
4546                end: UNIX_EPOCH
4547                    .checked_add(Duration::from_secs(100_000))
4548                    .unwrap(),
4549                interval: Duration::from_secs(5),
4550                lookback_delta: Duration::from_secs(1),
4551            };
4552
4553            let table_provider = build_test_table_provider(
4554                &[
4555                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4556                    (
4557                        "greptime_private".to_string(),
4558                        "some_alt_metric".to_string(),
4559                    ),
4560                ],
4561                1,
4562                1,
4563            )
4564            .await;
4565
4566            let plan =
4567                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4568                    .await;
4569            assert!(plan.is_err(), "query: {:?}", query);
4570        }
4571    }
4572
4573    #[tokio::test]
4574    async fn test_non_ms_precision() {
4575        let catalog_list = MemoryCatalogManager::with_default_setup();
4576        let columns = vec![
4577            ColumnSchema::new(
4578                "tag".to_string(),
4579                ConcreteDataType::string_datatype(),
4580                false,
4581            ),
4582            ColumnSchema::new(
4583                "timestamp".to_string(),
4584                ConcreteDataType::timestamp_nanosecond_datatype(),
4585                false,
4586            )
4587            .with_time_index(true),
4588            ColumnSchema::new(
4589                "field".to_string(),
4590                ConcreteDataType::float64_datatype(),
4591                true,
4592            ),
4593        ];
4594        let schema = Arc::new(Schema::new(columns));
4595        let table_meta = TableMetaBuilder::empty()
4596            .schema(schema)
4597            .primary_key_indices(vec![0])
4598            .value_indices(vec![2])
4599            .next_column_id(1024)
4600            .build()
4601            .unwrap();
4602        let table_info = TableInfoBuilder::default()
4603            .name("metrics".to_string())
4604            .meta(table_meta)
4605            .build()
4606            .unwrap();
4607        let table = EmptyTable::from_table_info(&table_info);
4608        assert!(
4609            catalog_list
4610                .register_table_sync(RegisterTableRequest {
4611                    catalog: DEFAULT_CATALOG_NAME.to_string(),
4612                    schema: DEFAULT_SCHEMA_NAME.to_string(),
4613                    table_name: "metrics".to_string(),
4614                    table_id: 1024,
4615                    table,
4616                })
4617                .is_ok()
4618        );
4619
4620        let plan = PromPlanner::stmt_to_plan(
4621            DfTableSourceProvider::new(
4622                catalog_list.clone(),
4623                false,
4624                QueryContext::arc(),
4625                DummyDecoder::arc(),
4626                true,
4627            ),
4628            &EvalStmt {
4629                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4630                start: UNIX_EPOCH,
4631                end: UNIX_EPOCH
4632                    .checked_add(Duration::from_secs(100_000))
4633                    .unwrap(),
4634                interval: Duration::from_secs(5),
4635                lookback_delta: Duration::from_secs(1),
4636            },
4637            &build_query_engine_state(),
4638        )
4639        .await
4640        .unwrap();
4641        assert_eq!(
4642            plan.display_indent_schema().to_string(),
4643            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4644        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4645        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4646        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4647        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4648        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4649        );
4650        let plan = PromPlanner::stmt_to_plan(
4651            DfTableSourceProvider::new(
4652                catalog_list.clone(),
4653                false,
4654                QueryContext::arc(),
4655                DummyDecoder::arc(),
4656                true,
4657            ),
4658            &EvalStmt {
4659                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4660                start: UNIX_EPOCH,
4661                end: UNIX_EPOCH
4662                    .checked_add(Duration::from_secs(100_000))
4663                    .unwrap(),
4664                interval: Duration::from_secs(5),
4665                lookback_delta: Duration::from_secs(1),
4666            },
4667            &build_query_engine_state(),
4668        )
4669        .await
4670        .unwrap();
4671        assert_eq!(
4672            plan.display_indent_schema().to_string(),
4673            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4674        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4675        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4676        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4677        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4678        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4679        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4680        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4681        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4682        );
4683    }
4684
4685    #[tokio::test]
4686    async fn test_nonexistent_label() {
4687        // template
4688        let mut eval_stmt = EvalStmt {
4689            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4690            start: UNIX_EPOCH,
4691            end: UNIX_EPOCH
4692                .checked_add(Duration::from_secs(100_000))
4693                .unwrap(),
4694            interval: Duration::from_secs(5),
4695            lookback_delta: Duration::from_secs(1),
4696        };
4697
4698        let case = r#"some_metric{nonexistent="hi"}"#;
4699        let prom_expr = parser::parse(case).unwrap();
4700        eval_stmt.expr = prom_expr;
4701        let table_provider = build_test_table_provider(
4702            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4703            3,
4704            3,
4705        )
4706        .await;
4707        // Should be ok
4708        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4709            .await
4710            .unwrap();
4711    }
4712
4713    #[tokio::test]
4714    async fn test_label_join() {
4715        let prom_expr = parser::parse(
4716            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4717        )
4718        .unwrap();
4719        let eval_stmt = EvalStmt {
4720            expr: prom_expr,
4721            start: UNIX_EPOCH,
4722            end: UNIX_EPOCH
4723                .checked_add(Duration::from_secs(100_000))
4724                .unwrap(),
4725            interval: Duration::from_secs(5),
4726            lookback_delta: Duration::from_secs(1),
4727        };
4728
4729        let table_provider =
4730            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4731                .await;
4732        let plan =
4733            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4734                .await
4735                .unwrap();
4736
4737        let expected = r#"
4738Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4739  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4740    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4741      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4742        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4743          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4744            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4745
4746        let ret = plan.display_indent_schema().to_string();
4747        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4748    }
4749
4750    #[tokio::test]
4751    async fn test_label_replace() {
4752        let prom_expr = parser::parse(
4753            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4754        )
4755        .unwrap();
4756        let eval_stmt = EvalStmt {
4757            expr: prom_expr,
4758            start: UNIX_EPOCH,
4759            end: UNIX_EPOCH
4760                .checked_add(Duration::from_secs(100_000))
4761                .unwrap(),
4762            interval: Duration::from_secs(5),
4763            lookback_delta: Duration::from_secs(1),
4764        };
4765
4766        let table_provider =
4767            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4768                .await;
4769        let plan =
4770            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4771                .await
4772                .unwrap();
4773
4774        let expected = r#"
4775Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4776  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4777    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4778      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4779        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4780          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4781            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4782
4783        let ret = plan.display_indent_schema().to_string();
4784        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4785    }
4786
4787    #[tokio::test]
4788    async fn test_matchers_to_expr() {
4789        let mut eval_stmt = EvalStmt {
4790            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4791            start: UNIX_EPOCH,
4792            end: UNIX_EPOCH
4793                .checked_add(Duration::from_secs(100_000))
4794                .unwrap(),
4795            interval: Duration::from_secs(5),
4796            lookback_delta: Duration::from_secs(1),
4797        };
4798        let case =
4799            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4800
4801        let prom_expr = parser::parse(case).unwrap();
4802        eval_stmt.expr = prom_expr;
4803        let table_provider = build_test_table_provider(
4804            &[(
4805                DEFAULT_SCHEMA_NAME.to_string(),
4806                "prometheus_tsdb_head_series".to_string(),
4807            )],
4808            3,
4809            3,
4810        )
4811        .await;
4812        let plan =
4813            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4814                .await
4815                .unwrap();
4816        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4817        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4818        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4819        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4820        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4821        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4822        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4823        assert_eq!(plan.display_indent_schema().to_string(), expected);
4824    }
4825
4826    #[tokio::test]
4827    async fn test_topk_expr() {
4828        let mut eval_stmt = EvalStmt {
4829            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4830            start: UNIX_EPOCH,
4831            end: UNIX_EPOCH
4832                .checked_add(Duration::from_secs(100_000))
4833                .unwrap(),
4834            interval: Duration::from_secs(5),
4835            lookback_delta: Duration::from_secs(1),
4836        };
4837        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4838
4839        let prom_expr = parser::parse(case).unwrap();
4840        eval_stmt.expr = prom_expr;
4841        let table_provider = build_test_table_provider_with_fields(
4842            &[
4843                (
4844                    DEFAULT_SCHEMA_NAME.to_string(),
4845                    "prometheus_tsdb_head_series".to_string(),
4846                ),
4847                (
4848                    DEFAULT_SCHEMA_NAME.to_string(),
4849                    "http_server_requests_seconds_count".to_string(),
4850                ),
4851            ],
4852            &["ip"],
4853        )
4854        .await;
4855
4856        let plan =
4857            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4858                .await
4859                .unwrap();
4860        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4861        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4862        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4863        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4864        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4865        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4866        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4867        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4868        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4869        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4870        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4871
4872        assert_eq!(plan.display_indent_schema().to_string(), expected);
4873    }
4874
4875    #[tokio::test]
4876    async fn test_count_values_expr() {
4877        let mut eval_stmt = EvalStmt {
4878            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4879            start: UNIX_EPOCH,
4880            end: UNIX_EPOCH
4881                .checked_add(Duration::from_secs(100_000))
4882                .unwrap(),
4883            interval: Duration::from_secs(5),
4884            lookback_delta: Duration::from_secs(1),
4885        };
4886        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4887
4888        let prom_expr = parser::parse(case).unwrap();
4889        eval_stmt.expr = prom_expr;
4890        let table_provider = build_test_table_provider_with_fields(
4891            &[
4892                (
4893                    DEFAULT_SCHEMA_NAME.to_string(),
4894                    "prometheus_tsdb_head_series".to_string(),
4895                ),
4896                (
4897                    DEFAULT_SCHEMA_NAME.to_string(),
4898                    "http_server_requests_seconds_count".to_string(),
4899                ),
4900            ],
4901            &["ip"],
4902        )
4903        .await;
4904
4905        let plan =
4906            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4907                .await
4908                .unwrap();
4909        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4910        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4911        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4912        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4913        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4914        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4915        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4916        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4917        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4918
4919        assert_eq!(plan.display_indent_schema().to_string(), expected);
4920    }
4921
4922    #[tokio::test]
4923    async fn test_quantile_expr() {
4924        let mut eval_stmt = EvalStmt {
4925            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4926            start: UNIX_EPOCH,
4927            end: UNIX_EPOCH
4928                .checked_add(Duration::from_secs(100_000))
4929                .unwrap(),
4930            interval: Duration::from_secs(5),
4931            lookback_delta: Duration::from_secs(1),
4932        };
4933        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4934
4935        let prom_expr = parser::parse(case).unwrap();
4936        eval_stmt.expr = prom_expr;
4937        let table_provider = build_test_table_provider_with_fields(
4938            &[
4939                (
4940                    DEFAULT_SCHEMA_NAME.to_string(),
4941                    "prometheus_tsdb_head_series".to_string(),
4942                ),
4943                (
4944                    DEFAULT_SCHEMA_NAME.to_string(),
4945                    "http_server_requests_seconds_count".to_string(),
4946                ),
4947            ],
4948            &["ip"],
4949        )
4950        .await;
4951
4952        let plan =
4953            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4954                .await
4955                .unwrap();
4956        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4957        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4958        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4959        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4960        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4961        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4962        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4963        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4964        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4965
4966        assert_eq!(plan.display_indent_schema().to_string(), expected);
4967    }
4968
4969    #[tokio::test]
4970    async fn test_or_not_exists_table_label() {
4971        let mut eval_stmt = EvalStmt {
4972            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4973            start: UNIX_EPOCH,
4974            end: UNIX_EPOCH
4975                .checked_add(Duration::from_secs(100_000))
4976                .unwrap(),
4977            interval: Duration::from_secs(5),
4978            lookback_delta: Duration::from_secs(1),
4979        };
4980        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4981
4982        let prom_expr = parser::parse(case).unwrap();
4983        eval_stmt.expr = prom_expr;
4984        let table_provider = build_test_table_provider_with_fields(
4985            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4986            &["job"],
4987        )
4988        .await;
4989
4990        let plan =
4991            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4992                .await
4993                .unwrap();
4994        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4995  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4996    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
4997      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4998        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
4999          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5000            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5001              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5002                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5003                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5004  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5005    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5006      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5007        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5008          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5009            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5010
5011        assert_eq!(plan.display_indent_schema().to_string(), expected);
5012    }
5013
5014    #[tokio::test]
5015    async fn test_histogram_quantile_missing_le_column() {
5016        let mut eval_stmt = EvalStmt {
5017            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5018            start: UNIX_EPOCH,
5019            end: UNIX_EPOCH
5020                .checked_add(Duration::from_secs(100_000))
5021                .unwrap(),
5022            interval: Duration::from_secs(5),
5023            lookback_delta: Duration::from_secs(1),
5024        };
5025
5026        // Test case: histogram_quantile with a table that doesn't have 'le' column
5027        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5028
5029        let prom_expr = parser::parse(case).unwrap();
5030        eval_stmt.expr = prom_expr;
5031
5032        // Create a table provider with a table that doesn't have 'le' column
5033        let table_provider = build_test_table_provider_with_fields(
5034            &[(
5035                DEFAULT_SCHEMA_NAME.to_string(),
5036                "non_existent_histogram_bucket".to_string(),
5037            )],
5038            &["pod", "instance"], // Note: no 'le' column
5039        )
5040        .await;
5041
5042        // Should return empty result instead of error
5043        let result =
5044            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5045                .await;
5046
5047        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
5048        assert!(
5049            result.is_ok(),
5050            "Expected successful plan creation with empty result, but got error: {:?}",
5051            result.err()
5052        );
5053
5054        // Verify that the result is an EmptyRelation
5055        let plan = result.unwrap();
5056        match plan {
5057            LogicalPlan::EmptyRelation(_) => {
5058                // This is what we expect
5059            }
5060            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5061        }
5062    }
5063}